import subprocess
import time
+import hostapd
from wpasupplicant import WpaSupplicant
import hwsim_utils
from utils import HwsimSkip, alloc_fail, fail_test, wait_fail_trigger
dev.select_network(id)
+def set_mka_eap_config(dev, mka_priority=None, integ_only=False, port=None):
+ dev.set("eapol_version", "3")
+ dev.set("ap_scan", "0")
+ dev.set("fast_reauth", "1")
+
+ id = dev.add_network()
+ dev.set_network(id, "key_mgmt", "NONE")
+ dev.set_network(id, "eapol_flags", "0")
+ dev.set_network(id, "macsec_policy", "1")
+ if integ_only:
+ dev.set_network(id, "macsec_integ_only", "1")
+ if mka_priority is not None:
+ dev.set_network(id, "mka_priority", str(mka_priority))
+ if port is not None:
+ dev.set_network(id, "macsec_port", str(port))
+
+ dev.set_network(id, "key_mgmt", "IEEE8021X")
+ dev.set_network(id, "eap", "TTLS")
+ dev.set_network_quoted(id, "ca_cert", "auth_serv/ca.pem")
+ dev.set_network_quoted(id, "phase2", "auth=MSCHAPV2")
+ dev.set_network_quoted(id, "anonymous_identity", "ttls")
+ dev.set_network_quoted(id, "identity", "DOMAIN\mschapv2 user")
+ dev.set_network_quoted(id, "password", "password")
+
+ dev.select_network(id)
+
def log_ip_macsec():
cmd = subprocess.Popen(["ip", "macsec", "show"],
stdout=subprocess.PIPE,
return False
return False
-def wait_mka_done(wpa, expect_failure=False):
+def wait_mka_done(wpa, expect_failure=False, hostapd=False):
max_iter = 14 if expect_failure else 40
for i in range(max_iter):
done = True
for w in wpa:
secured = w.get_status_field("Secured")
- peers = int(w.get_status_field("live_peers"))
+ live_peers = w.get_status_field("live_peers")
+ peers = int(live_peers) if live_peers else 0
if expect_failure and (secured == "Yes" or peers > 0):
raise Exception("MKA completed unexpectedly")
- if peers != len(wpa) - 1 or secured != "Yes":
+ expect_peers = len(wpa) - 1
+ if hostapd:
+ expect_peers += 1
+ if peers != expect_peers or secured != "Yes":
done = False
break
w.dump_monitor()
if not done:
raise Exception("MKA not completed successfully")
+ if hostapd:
+ # TODO: check that hostapd is the key server
+ return
+
key_server = None
ks_prio = 999
for w in wpa:
wait_mka_done(wpa)
finally:
cleanup_macsec()
+
+def cleanup_macsec_hostapd():
+ wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5', monitor=False)
+ wpas.interface_remove("veth0")
+ del wpas
+ hapd = hostapd.HostapdGlobal()
+ hapd.remove('veth1')
+ subprocess.call(["ip", "link", "del", "veth0"],
+ stderr=open('/dev/null', 'w'))
+ log_ip_link()
+
+def test_macsec_hostapd_psk(dev, apdev, params):
+ """MACsec PSK with hostapd"""
+ try:
+ run_macsec_hostapd_psk(dev, apdev, params, "macsec_hostapd_psk")
+ finally:
+ cleanup_macsec_hostapd()
+
+def run_macsec_hostapd_psk(dev, apdev, params, prefix, integ_only=False,
+ port0=None, port1=None, ckn0=None, ckn1=None,
+ cak0=None, cak1=None, expect_failure=False):
+ add_veth()
+
+ cap_veth0 = os.path.join(params['logdir'], prefix + ".veth0.pcap")
+ cap_veth1 = os.path.join(params['logdir'], prefix + ".veth1.pcap")
+ cap_macsec0 = os.path.join(params['logdir'], prefix + ".macsec0.pcap")
+ cap_macsec1 = os.path.join(params['logdir'], prefix + ".macsec1.pcap")
+
+ for i in range(2):
+ subprocess.check_call(["ip", "link", "set", "dev", "veth%d" % i, "up"])
+
+ cmd = {}
+ cmd[0] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'veth0',
+ '-w', cap_veth0, '-s', '2000',
+ '--immediate-mode'],
+ stderr=open('/dev/null', 'w'))
+ cmd[1] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'veth1',
+ '-w', cap_veth1, '-s', '2000',
+ '--immediate-mode'],
+ stderr=open('/dev/null', 'w'))
+
+ wpa = add_wpas_interfaces(count=1)
+ wpas0 = wpa[0]
+
+ set_mka_psk_config(wpas0, integ_only=integ_only, port=port0, ckn=ckn0,
+ cak=cak0, mka_priority=100)
+
+ if cak1 is None:
+ cak1 = "000102030405060708090a0b0c0d0e0f"
+ if ckn1 is None:
+ ckn1 = "000102030405060708090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f"
+ params = {"driver": "macsec_linux",
+ "interface": "veth1",
+ "eapol_version": "3",
+ "mka_cak": cak1,
+ "mka_ckn": ckn1,
+ "macsec_policy": "1",
+ "mka_priority": "1"}
+ if integ_only:
+ params["macsec_integ_only"] = "1"
+ if port1 is not None:
+ params["macsec_port"] = str(port1)
+ apdev = {'ifname': 'veth1'}
+ try:
+ hapd = hostapd.add_ap(apdev, params, driver="macsec_linux")
+ except:
+ raise HwsimSkip("No CONFIG_MACSEC=y in hostapd")
+
+ log_ip_macsec()
+ log_ip_link()
+
+ logger.info("wpas0 STATUS:\n" + wpas0.request("STATUS"))
+ logger.info("wpas0 STATUS-DRIVER:\n" + wpas0.request("STATUS-DRIVER"))
+
+ wait_mka_done(wpa, expect_failure=expect_failure, hostapd=True)
+ log_ip_link()
+
+ if expect_failure:
+ for i in range(len(cmd)):
+ cmd[i].terminate()
+ return
+
+ macsec_ifname0 = wpas0.get_driver_status_field("parent_ifname")
+ macsec_ifname1 = hapd.get_driver_status_field("parent_ifname")
+
+ cmd[2] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', macsec_ifname0,
+ '-w', cap_macsec0, '-s', '2000',
+ '--immediate-mode'],
+ stderr=open('/dev/null', 'w'))
+ cmd[3] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', macsec_ifname1,
+ '-w', cap_macsec1, '-s', '2000',
+ '--immediate-mode'],
+ stderr=open('/dev/null', 'w'))
+ time.sleep(0.5)
+
+ logger.info("wpas0 MIB:\n" + wpas0.request("MIB"))
+ logger.info("wpas0 STATUS:\n" + wpas0.request("STATUS"))
+ log_ip_macsec()
+ hwsim_utils.test_connectivity(wpas0, hapd,
+ ifname1=macsec_ifname0,
+ ifname2=macsec_ifname1,
+ send_len=1400)
+ log_ip_macsec()
+
+ time.sleep(1)
+ for i in range(len(cmd)):
+ cmd[i].terminate()
+
+def test_macsec_hostapd_eap(dev, apdev, params):
+ """MACsec EAP with hostapd"""
+ try:
+ run_macsec_hostapd_eap(dev, apdev, params, "macsec_hostapd_eap")
+ finally:
+ cleanup_macsec_hostapd()
+
+def run_macsec_hostapd_eap(dev, apdev, params, prefix, integ_only=False,
+ port0=None, port1=None, expect_failure=False):
+ add_veth()
+
+ cap_veth0 = os.path.join(params['logdir'], prefix + ".veth0.pcap")
+ cap_veth1 = os.path.join(params['logdir'], prefix + ".veth1.pcap")
+ cap_macsec0 = os.path.join(params['logdir'], prefix + ".macsec0.pcap")
+ cap_macsec1 = os.path.join(params['logdir'], prefix + ".macsec1.pcap")
+
+ for i in range(2):
+ subprocess.check_call(["ip", "link", "set", "dev", "veth%d" % i, "up"])
+
+ cmd = {}
+ cmd[0] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'veth0',
+ '-w', cap_veth0, '-s', '2000',
+ '--immediate-mode'],
+ stderr=open('/dev/null', 'w'))
+ cmd[1] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'veth1',
+ '-w', cap_veth1, '-s', '2000',
+ '--immediate-mode'],
+ stderr=open('/dev/null', 'w'))
+
+ wpa = add_wpas_interfaces(count=1)
+ wpas0 = wpa[0]
+
+ set_mka_eap_config(wpas0, integ_only=integ_only, port=port0,
+ mka_priority=100)
+
+ params = {"driver": "macsec_linux",
+ "interface": "veth1",
+ "eapol_version": "3",
+ "macsec_policy": "1",
+ "mka_priority": "1",
+ "ieee8021x": "1",
+ "auth_server_addr": "127.0.0.1",
+ "auth_server_port": "1812",
+ "auth_server_shared_secret": "radius",
+ "nas_identifier": "nas.w1.fi"}
+ if integ_only:
+ params["macsec_integ_only"] = "1"
+ if port1 is not None:
+ params["macsec_port"] = str(port1)
+ apdev = {'ifname': 'veth1'}
+ try:
+ hapd = hostapd.add_ap(apdev, params, driver="macsec_linux")
+ except:
+ raise HwsimSkip("No CONFIG_MACSEC=y in hostapd")
+
+ log_ip_macsec()
+ log_ip_link()
+
+ logger.info("wpas0 STATUS:\n" + wpas0.request("STATUS"))
+ logger.info("wpas0 STATUS-DRIVER:\n" + wpas0.request("STATUS-DRIVER"))
+
+ wait_mka_done(wpa, expect_failure=expect_failure, hostapd=True)
+ log_ip_link()
+
+ if expect_failure:
+ for i in range(len(cmd)):
+ cmd[i].terminate()
+ return
+
+ macsec_ifname0 = wpas0.get_driver_status_field("parent_ifname")
+ macsec_ifname1 = hapd.get_driver_status_field("parent_ifname")
+
+ cmd[2] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', macsec_ifname0,
+ '-w', cap_macsec0, '-s', '2000',
+ '--immediate-mode'],
+ stderr=open('/dev/null', 'w'))
+ cmd[3] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', macsec_ifname1,
+ '-w', cap_macsec1, '-s', '2000',
+ '--immediate-mode'],
+ stderr=open('/dev/null', 'w'))
+ time.sleep(0.5)
+
+ logger.info("wpas0 MIB:\n" + wpas0.request("MIB"))
+ logger.info("wpas0 STATUS:\n" + wpas0.request("STATUS"))
+ log_ip_macsec()
+ hwsim_utils.test_connectivity(wpas0, hapd,
+ ifname1=macsec_ifname0,
+ ifname2=macsec_ifname1,
+ send_len=1400)
+ log_ip_macsec()
+
+ time.sleep(1)
+ for i in range(len(cmd)):
+ cmd[i].terminate()