if ev is not None:
raise Exception("Deauth imminent notice without PMF accepted")
-def test_ap_hs20_remediation_required(dev, apdev):
- """Hotspot 2.0 connection and remediation required from RADIUS"""
- check_eap_capa(dev[0], "MSCHAPV2")
- try:
- _test_ap_hs20_remediation_required(dev, apdev)
- finally:
- dev[0].request("SET pmf 0")
-
-def _test_ap_hs20_remediation_required(dev, apdev):
- bssid = apdev[0]['bssid']
- params = hs20_ap_params()
- params['nai_realm'] = ["0,example.com,21[2:4]"]
- hostapd.add_ap(apdev[0], params)
-
- dev[0].request("SET pmf 1")
- dev[0].hs20_enable()
- dev[0].add_cred_values({'realm': "example.com",
- 'username': "hs20-subrem-test",
- 'password': "password"})
- interworking_select(dev[0], bssid, freq="2412")
- interworking_connect(dev[0], bssid, "TTLS")
- ev = dev[0].wait_event(["HS20-SUBSCRIPTION-REMEDIATION"], timeout=5)
- if ev is None:
- raise Exception("Timeout on subscription remediation notice")
- if " 1 https://example.com/" not in ev:
- raise Exception("Unexpected subscription remediation event contents")
-
-def test_ap_hs20_remediation_required_ctrl(dev, apdev):
- """Hotspot 2.0 connection and subrem from ctrl_iface"""
- check_eap_capa(dev[0], "MSCHAPV2")
- try:
- _test_ap_hs20_remediation_required_ctrl(dev, apdev)
- finally:
- dev[0].request("SET pmf 0")
-
-def _test_ap_hs20_remediation_required_ctrl(dev, apdev):
- bssid = apdev[0]['bssid']
- addr = dev[0].own_addr()
- params = hs20_ap_params()
- params['nai_realm'] = ["0,example.com,21[2:4]"]
- hapd = hostapd.add_ap(apdev[0], params)
-
- dev[0].request("SET pmf 1")
- dev[0].hs20_enable()
- dev[0].add_cred_values(default_cred())
- interworking_select(dev[0], bssid, freq="2412")
- interworking_connect(dev[0], bssid, "TTLS")
-
- hapd.wait_sta()
- hapd.request("HS20_WNM_NOTIF " + addr + " https://example.com/")
- ev = dev[0].wait_event(["HS20-SUBSCRIPTION-REMEDIATION"], timeout=5)
- if ev is None:
- raise Exception("Timeout on subscription remediation notice")
- if " 1 https://example.com/" not in ev:
- raise Exception("Unexpected subscription remediation event contents")
-
- hapd.request("HS20_WNM_NOTIF " + addr)
- ev = dev[0].wait_event(["HS20-SUBSCRIPTION-REMEDIATION"], timeout=5)
- if ev is None:
- raise Exception("Timeout on subscription remediation notice")
- if not ev.endswith("HS20-SUBSCRIPTION-REMEDIATION "):
- raise Exception("Unexpected subscription remediation event contents: " + ev)
-
- if "FAIL" not in hapd.request("HS20_WNM_NOTIF "):
- raise Exception("Unexpected HS20_WNM_NOTIF success")
- if "FAIL" not in hapd.request("HS20_WNM_NOTIF foo"):
- raise Exception("Unexpected HS20_WNM_NOTIF success")
- if "FAIL" not in hapd.request("HS20_WNM_NOTIF " + addr + " https://12345678923456789842345678456783456712345678923456789842345678456783456712345678923456789842345678456783456712345678923456789842345678456783456712345678923456789842345678456783456712345678923456789842345678456783456712345678923456789842345678456783456712345678927.very.long.example.com/"):
- raise Exception("Unexpected HS20_WNM_NOTIF success")
- if "OK" not in hapd.request("HS20_WNM_NOTIF " + addr + " "):
- raise Exception("HS20_WNM_NOTIF failed with empty URL")
-
def test_ap_hs20_session_info(dev, apdev):
"""Hotspot 2.0 connection and session information from RADIUS"""
check_eap_capa(dev[0], "MSCHAPV2")
if ev is None:
raise Exception("Scan not completed")
-def test_ap_hs20_osen(dev, apdev):
- """Hotspot 2.0 OSEN connection"""
- params = {'ssid': "osen",
- 'osen': "1",
- 'auth_server_addr': "127.0.0.1",
- 'auth_server_port': "1812",
- 'auth_server_shared_secret': "radius"}
- hostapd.add_ap(apdev[0], params)
-
- dev[1].connect("osen", key_mgmt="NONE", scan_freq="2412",
- wait_connect=False)
- if "WEP40" in dev[2].get_capability("group"):
- dev[2].connect("osen", key_mgmt="NONE", wep_key0='"hello"',
- scan_freq="2412", wait_connect=False)
- dev[0].flush_scan_cache()
- dev[0].connect("osen", proto="OSEN", key_mgmt="OSEN", pairwise="CCMP",
- group="GTK_NOT_USED CCMP",
- eap="WFA-UNAUTH-TLS", identity="osen@example.com",
- ca_cert="auth_serv/ca.pem",
- scan_freq="2412")
- res = dev[0].get_bss(apdev[0]['bssid'])['flags']
- if "[OSEN-OSEN-CCMP]" not in res:
- raise Exception("OSEN not reported in BSS")
- if "[WEP]" in res:
- raise Exception("WEP reported in BSS")
- res = dev[0].request("SCAN_RESULTS")
- if "[OSEN-OSEN-CCMP]" not in res:
- raise Exception("OSEN not reported in SCAN_RESULTS")
-
- wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
- wpas.interface_add("wlan5", drv_params="force_connect_cmd=1")
- wpas.connect("osen", proto="OSEN", key_mgmt="OSEN", pairwise="CCMP",
- group="GTK_NOT_USED CCMP",
- eap="WFA-UNAUTH-TLS", identity="osen@example.com",
- ca_cert="auth_serv/ca.pem",
- scan_freq="2412")
- wpas.request("DISCONNECT")
-
-def test_ap_hs20_osen_single_ssid(dev, apdev):
- """Hotspot 2.0 OSEN-single-SSID connection"""
- bssid = apdev[0]['bssid']
- params = hs20_ap_params()
- params['wpa_key_mgmt'] = "WPA-EAP OSEN"
- params['hessid'] = bssid
- hapd = hostapd.add_ap(apdev[0], params)
-
- dev[0].flush_scan_cache()
-
- # RSN-OSEN (for OSU)
- dev[0].connect("test-hs20", proto="OSEN", key_mgmt="OSEN", pairwise="CCMP",
- group="CCMP GTK_NOT_USED",
- eap="WFA-UNAUTH-TLS", identity="osen@example.com",
- ca_cert="auth_serv/ca.pem", ieee80211w='2',
- scan_freq="2412")
- # RSN-EAP (for data connection)
- dev[1].connect("test-hs20", key_mgmt="WPA-EAP", eap="TTLS",
- identity="hs20-test", password="password",
- ca_cert="auth_serv/ca.pem", phase2="auth=MSCHAPV2",
- pairwise="CCMP", group="CCMP",
- ieee80211w='2', scan_freq="2412")
-
- res = dev[0].get_bss(apdev[0]['bssid'])['flags']
- if "[WPA2-EAP+OSEN-CCMP]" not in res:
- raise Exception("OSEN not reported in BSS")
- if "[WEP]" in res:
- raise Exception("WEP reported in BSS")
- res = dev[0].request("SCAN_RESULTS")
- if "[WPA2-EAP+OSEN-CCMP]" not in res:
- raise Exception("OSEN not reported in SCAN_RESULTS")
-
- hwsim_utils.test_connectivity(dev[1], hapd)
- hwsim_utils.test_connectivity(dev[0], hapd, broadcast=False)
- hwsim_utils.test_connectivity(dev[0], hapd, timeout=1,
- success_expected=False)
-
def test_ap_hs20_network_preference(dev, apdev):
"""Hotspot 2.0 network selection with preferred home network"""
check_eap_capa(dev[0], "MSCHAPV2")
raise Exception("Unexpected SCAN command result")
dev[0].wait_connected(timeout=15)
-def test_ap_hs20_fetch_osu(dev, apdev):
- """Hotspot 2.0 OSU provider and icon fetch"""
- bssid = apdev[0]['bssid']
- params = hs20_ap_params()
- params['hs20_icon'] = "128:80:zxx:image/png:w1fi_logo:w1fi_logo.png"
- params['osu_ssid'] = '"HS 2.0 OSU open"'
- params['osu_method_list'] = "1"
- params['osu_friendly_name'] = ["eng:Test OSU", "fin:Testi-OSU"]
- params['osu_icon'] = "w1fi_logo"
- params['osu_service_desc'] = ["eng:Example services", "fin:Esimerkkipalveluja"]
- params['osu_server_uri'] = "https://example.com/osu/"
- hostapd.add_ap(apdev[0], params)
-
- bssid2 = apdev[1]['bssid']
- params = hs20_ap_params(ssid="test-hs20b")
- params['hessid'] = bssid2
- params['hs20_icon'] = "128:80:zxx:image/png:w1fi_logo:w1fi_logo.png"
- params['osu_ssid'] = '"HS 2.0 OSU OSEN"'
- params['osu_method_list'] = "0"
- params['osu_nai'] = "osen@example.com"
- params['osu_friendly_name'] = ["eng:Test2 OSU", "fin:Testi2-OSU"]
- params['osu_icon'] = "w1fi_logo"
- params['osu_service_desc'] = ["eng:Example services2", "fin:Esimerkkipalveluja2"]
- params['osu_server_uri'] = "https://example.org/osu/"
- hostapd.add_ap(apdev[1], params)
-
- with open("w1fi_logo.png", "rb") as f:
- orig_logo = f.read()
- dev[0].hs20_enable()
- dir = "/tmp/osu-fetch"
- if os.path.isdir(dir):
- files = [f for f in os.listdir(dir) if f.startswith("osu-")]
- for f in files:
- os.remove(dir + "/" + f)
- else:
- try:
- os.makedirs(dir)
- except:
- pass
- try:
- dev[1].scan_for_bss(bssid, freq="2412")
- dev[2].scan_for_bss(bssid, freq="2412")
- dev[0].request("SET osu_dir " + dir)
- dev[0].request("FETCH_OSU")
- if "FAIL" not in dev[1].request("HS20_ICON_REQUEST foo w1fi_logo"):
- raise Exception("Invalid HS20_ICON_REQUEST accepted")
- if "OK" not in dev[1].request("HS20_ICON_REQUEST " + bssid + " w1fi_logo"):
- raise Exception("HS20_ICON_REQUEST failed")
- if "OK" not in dev[2].request("REQ_HS20_ICON " + bssid + " w1fi_logo"):
- raise Exception("REQ_HS20_ICON failed")
- icons = 0
- while True:
- ev = dev[0].wait_event(["OSU provider fetch completed",
- "RX-HS20-ANQP-ICON"], timeout=15)
- if ev is None:
- raise Exception("Timeout on OSU fetch")
- if "OSU provider fetch completed" in ev:
- break
- if "RX-HS20-ANQP-ICON" in ev:
- with open(ev.split(' ')[1], "rb") as f:
- logo = f.read()
- if logo == orig_logo:
- icons += 1
-
- with open(dir + "/osu-providers.txt", "r") as f:
- prov = f.read()
- logger.debug("osu-providers.txt: " + prov)
- if "OSU-PROVIDER " + bssid not in prov:
- raise Exception("Missing OSU_PROVIDER(1)")
- if "OSU-PROVIDER " + bssid2 not in prov:
- raise Exception("Missing OSU_PROVIDER(2)")
- finally:
- files = [f for f in os.listdir(dir) if f.startswith("osu-")]
- for f in files:
- os.remove(dir + "/" + f)
- os.rmdir(dir)
-
- if icons != 2:
- raise Exception("Unexpected number of icons fetched")
-
- ev = dev[1].wait_event(["GAS-QUERY-START"], timeout=5)
- if ev is None:
- raise Exception("Timeout on GAS-QUERY-DONE")
- ev = dev[1].wait_event(["GAS-QUERY-DONE"], timeout=5)
- if ev is None:
- raise Exception("Timeout on GAS-QUERY-DONE")
- if "freq=2412 status_code=0 result=SUCCESS" not in ev:
- raise Exception("Unexpected GAS-QUERY-DONE: " + ev)
- ev = dev[1].wait_event(["RX-HS20-ANQP"], timeout=15)
- if ev is None:
- raise Exception("Timeout on icon fetch")
- if "Icon Binary File" not in ev:
- raise Exception("Unexpected ANQP element")
-
- ev = dev[2].wait_event(["RX-HS20-ICON"], timeout=5)
- if ev is None:
- raise Exception("Timeout on RX-HS20-ICON")
- event_icon_len = ev.split(' ')[3]
- if " w1fi_logo " not in ev:
- raise Exception("RX-HS20-ICON did not have the expected file name")
- if bssid not in ev:
- raise Exception("RX-HS20-ICON did not have the expected BSSID")
- if "FAIL" in dev[2].request("GET_HS20_ICON " + bssid + " w1fi_logo 0 10"):
- raise Exception("GET_HS20_ICON 0..10 failed")
- if "FAIL" in dev[2].request("GET_HS20_ICON " + bssid + " w1fi_logo 5 10"):
- raise Exception("GET_HS20_ICON 5..15 failed")
- if "FAIL" not in dev[2].request("GET_HS20_ICON " + bssid + " w1fi_logo 100000 10"):
- raise Exception("Unexpected success of GET_HS20_ICON with too large offset")
- if "FAIL" not in dev[2].request("GET_HS20_ICON " + bssid + " no_such_logo 0 10"):
- raise Exception("GET_HS20_ICON for not existing icon succeeded")
- if "FAIL" not in dev[2].request("GET_HS20_ICON " + bssid + " w1fi_logo 0 3070"):
- raise Exception("GET_HS20_ICON with too many output bytes to fit the buffer succeeded")
- if "FAIL" not in dev[2].request("GET_HS20_ICON " + bssid + " w1fi_logo 0 0"):
- raise Exception("GET_HS20_ICON 0..0 succeeded")
- icon = b''
- pos = 0
- while True:
- if pos > 100000:
- raise Exception("Unexpectedly long icon")
- res = dev[2].request("GET_HS20_ICON " + bssid + " w1fi_logo %d 1000" % pos)
- if res.startswith("FAIL"):
- break
- icon += base64.b64decode(res)
- pos += 1000
- hex = binascii.hexlify(icon).decode()
- if not hex.startswith("0009696d6167652f706e677d1d"):
- raise Exception("Unexpected beacon binary header: " + hex)
- with open('w1fi_logo.png', 'rb') as f:
- data = f.read()
- if icon[13:] != data:
- raise Exception("Unexpected icon data")
- if len(icon) != int(event_icon_len):
- raise Exception("Unexpected RX-HS20-ICON event length: " + event_icon_len)
-
- for i in range(3):
- if "OK" not in dev[i].request("REQ_HS20_ICON " + bssid + " w1fi_logo"):
- raise Exception("REQ_HS20_ICON failed [2]")
- for i in range(3):
- ev = dev[i].wait_event(["RX-HS20-ICON"], timeout=5)
- if ev is None:
- raise Exception("Timeout on RX-HS20-ICON [2]")
-
- if "FAIL" not in dev[2].request("DEL_HS20_ICON foo w1fi_logo"):
- raise Exception("Invalid DEL_HS20_ICON accepted")
- if "OK" not in dev[2].request("DEL_HS20_ICON " + bssid + " w1fi_logo"):
- raise Exception("DEL_HS20_ICON failed")
- if "OK" not in dev[1].request("DEL_HS20_ICON " + bssid):
- raise Exception("DEL_HS20_ICON failed")
- if "OK" not in dev[0].request("DEL_HS20_ICON "):
- raise Exception("DEL_HS20_ICON failed")
- for i in range(3):
- if "FAIL" not in dev[i].request("DEL_HS20_ICON "):
- raise Exception("DEL_HS20_ICON accepted when no icons left")
-
-def test_ap_hs20_fetch_osu_no_info(dev, apdev):
- """Hotspot 2.0 OSU provider and no AP with info"""
- bssid = apdev[0]['bssid']
- params = hs20_ap_params()
- hostapd.add_ap(apdev[0], params)
-
- dev[0].hs20_enable()
- dir = "/tmp/osu-fetch"
- if os.path.isdir(dir):
- files = [f for f in os.listdir(dir) if f.startswith("osu-")]
- for f in files:
- os.remove(dir + "/" + f)
- else:
- try:
- os.makedirs(dir)
- except:
- pass
- dev[0].scan_for_bss(bssid, freq="2412")
- try:
- dev[0].request("SET osu_dir " + dir)
- dev[0].request("FETCH_OSU")
- ev = dev[0].wait_event(["OSU provider fetch completed"], timeout=30)
- if ev is None:
- raise Exception("Timeout on OSU fetch")
- finally:
- files = [f for f in os.listdir(dir) if f.startswith("osu-")]
- for f in files:
- os.remove(dir + "/" + f)
- os.rmdir(dir)
-
-def test_ap_hs20_fetch_osu_no_icon(dev, apdev):
- """Hotspot 2.0 OSU provider and no icon found"""
- bssid = apdev[0]['bssid']
- params = hs20_ap_params()
- params['hs20_icon'] = "128:80:zxx:image/png:w1fi_logo:w1fi_logo-no-file.png"
- params['osu_ssid'] = '"HS 2.0 OSU open"'
- params['osu_method_list'] = "1"
- params['osu_friendly_name'] = ["eng:Test OSU", "fin:Testi-OSU"]
- params['osu_icon'] = "w1fi_logo"
- params['osu_service_desc'] = ["eng:Example services",
- "fin:Esimerkkipalveluja"]
- params['osu_server_uri'] = "https://example.com/osu/"
- hostapd.add_ap(apdev[0], params)
-
- dev[0].hs20_enable()
- dir = "/tmp/osu-fetch"
- if os.path.isdir(dir):
- files = [f for f in os.listdir(dir) if f.startswith("osu-")]
- for f in files:
- os.remove(dir + "/" + f)
- else:
- try:
- os.makedirs(dir)
- except:
- pass
- dev[0].scan_for_bss(bssid, freq="2412")
- try:
- dev[0].request("SET osu_dir " + dir)
- dev[0].request("FETCH_OSU")
- ev = dev[0].wait_event(["OSU provider fetch completed"], timeout=30)
- if ev is None:
- raise Exception("Timeout on OSU fetch")
- finally:
- files = [f for f in os.listdir(dir) if f.startswith("osu-")]
- for f in files:
- os.remove(dir + "/" + f)
- os.rmdir(dir)
-
-def test_ap_hs20_fetch_osu_single_ssid(dev, apdev):
- """Hotspot 2.0 OSU provider and single SSID"""
- bssid = apdev[0]['bssid']
- params = hs20_ap_params()
- params['hs20_icon'] = "128:80:zxx:image/png:w1fi_logo:w1fi_logo-no-file.png"
- params['osu_ssid'] = '"HS 2.0 OSU open"'
- params['osu_method_list'] = "1"
- params['osu_friendly_name'] = ["eng:Test OSU", "fin:Testi-OSU"]
- params['osu_nai2'] = "osen@example.com"
- params['osu_icon'] = "w1fi_logo"
- params['osu_service_desc'] = ["eng:Example services",
- "fin:Esimerkkipalveluja"]
- params['osu_server_uri'] = "https://example.com/osu/"
- params['wpa_key_mgmt'] = "WPA-EAP OSEN"
- hostapd.add_ap(apdev[0], params)
-
- dev[0].hs20_enable()
- dir = "/tmp/osu-fetch"
- if os.path.isdir(dir):
- files = [f for f in os.listdir(dir) if f.startswith("osu-")]
- for f in files:
- os.remove(dir + "/" + f)
- else:
- try:
- os.makedirs(dir)
- except:
- pass
- dev[0].scan_for_bss(bssid, freq="2412")
- try:
- dev[0].request("SET osu_dir " + dir)
- dev[0].request("FETCH_OSU")
- ev = dev[0].wait_event(["OSU provider fetch completed"], timeout=30)
- if ev is None:
- raise Exception("Timeout on OSU fetch")
- osu_ssid = False
- osu_ssid2 = False
- osu_nai = False
- osu_nai2 = False
- with open(os.path.join(dir, "osu-providers.txt"), "r") as f:
- for l in f.readlines():
- logger.info(l.strip())
- if l.strip() == "osu_ssid=HS 2.0 OSU open":
- osu_ssid = True
- if l.strip() == "osu_ssid2=test-hs20":
- osu_ssid2 = True
- if l.strip().startswith("osu_nai="):
- osu_nai = True
- if l.strip() == "osu_nai2=osen@example.com":
- osu_nai2 = True
- if not osu_ssid:
- raise Exception("osu_ssid not reported")
- if not osu_ssid2:
- raise Exception("osu_ssid2 not reported")
- if osu_nai:
- raise Exception("osu_nai reported unexpectedly")
- if not osu_nai2:
- raise Exception("osu_nai2 not reported")
- finally:
- files = [f for f in os.listdir(dir) if f.startswith("osu-")]
- for f in files:
- os.remove(dir + "/" + f)
- os.rmdir(dir)
-
-def test_ap_hs20_fetch_osu_single_ssid2(dev, apdev):
- """Hotspot 2.0 OSU provider and single SSID (two OSU providers)"""
- bssid = apdev[0]['bssid']
- params = hs20_ap_params()
- params['hs20_icon'] = "128:80:zxx:image/png:w1fi_logo:w1fi_logo-no-file.png"
- params['osu_ssid'] = '"HS 2.0 OSU open"'
- params['osu_method_list'] = "1"
- params['osu_friendly_name'] = ["eng:Test OSU", "fin:Testi-OSU"]
- params['osu_nai2'] = "osen@example.com"
- params['osu_icon'] = "w1fi_logo"
- params['osu_service_desc'] = ["eng:Example services",
- "fin:Esimerkkipalveluja"]
- params['osu_server_uri'] = "https://example.com/osu/"
- params['wpa_key_mgmt'] = "WPA-EAP OSEN"
- hapd = hostapd.add_ap(apdev[0], params, no_enable=True)
-
- hapd.set('osu_server_uri', 'https://another.example.com/osu/')
- hapd.set('osu_method_list', "1")
- hapd.set('osu_nai2', "osen@another.example.com")
- hapd.enable()
-
- dev[0].hs20_enable()
- dir = "/tmp/osu-fetch"
- if os.path.isdir(dir):
- files = [f for f in os.listdir(dir) if f.startswith("osu-")]
- for f in files:
- os.remove(dir + "/" + f)
- else:
- try:
- os.makedirs(dir)
- except:
- pass
- dev[0].scan_for_bss(bssid, freq="2412")
- try:
- dev[0].request("SET osu_dir " + dir)
- dev[0].request("FETCH_OSU")
- ev = dev[0].wait_event(["OSU provider fetch completed"], timeout=30)
- if ev is None:
- raise Exception("Timeout on OSU fetch")
- osu_ssid = False
- osu_ssid2 = False
- osu_nai = False
- osu_nai2 = False
- osu_nai2b = False
- with open(os.path.join(dir, "osu-providers.txt"), "r") as f:
- for l in f.readlines():
- logger.info(l.strip())
- if l.strip() == "osu_ssid=HS 2.0 OSU open":
- osu_ssid = True
- if l.strip() == "osu_ssid2=test-hs20":
- osu_ssid2 = True
- if l.strip().startswith("osu_nai="):
- osu_nai = True
- if l.strip() == "osu_nai2=osen@example.com":
- osu_nai2 = True
- if l.strip() == "osu_nai2=osen@another.example.com":
- osu_nai2b = True
- if not osu_ssid:
- raise Exception("osu_ssid not reported")
- if not osu_ssid2:
- raise Exception("osu_ssid2 not reported")
- if osu_nai:
- raise Exception("osu_nai reported unexpectedly")
- if not osu_nai2:
- raise Exception("osu_nai2 not reported")
- if not osu_nai2b:
- raise Exception("osu_nai2b not reported")
- finally:
- files = [f for f in os.listdir(dir) if f.startswith("osu-")]
- for f in files:
- os.remove(dir + "/" + f)
- os.rmdir(dir)
-
-def get_icon(dev, bssid, iconname):
- icon = b''
- pos = 0
- while True:
- if pos > 100000:
- raise Exception("Unexpectedly long icon")
- res = dev.request("GET_HS20_ICON " + bssid + " " + iconname + " %d 3000" % pos)
- if res.startswith("FAIL"):
- break
- icon += base64.b64decode(res)
- pos += 3000
- if len(icon) < 13:
- raise Exception("Too short GET_HS20_ICON response")
- return icon[0:13], icon[13:]
-
-def test_ap_hs20_req_hs20_icon(dev, apdev):
- """Hotspot 2.0 OSU provider and multi-icon fetch with REQ_HS20_ICON"""
- bssid = apdev[0]['bssid']
- params = hs20_ap_params()
- params['hs20_icon'] = ["128:80:zxx:image/png:w1fi_logo:w1fi_logo.png",
- "128:80:zxx:image/png:test_logo:auth_serv/sha512-server.pem"]
- params['osu_ssid'] = '"HS 2.0 OSU open"'
- params['osu_method_list'] = "1"
- params['osu_friendly_name'] = ["eng:Test OSU", "fin:Testi-OSU"]
- params['osu_icon'] = ["w1fi_logo", "w1fi_logo2"]
- params['osu_service_desc'] = ["eng:Example services",
- "fin:Esimerkkipalveluja"]
- params['osu_server_uri'] = "https://example.com/osu/"
- hostapd.add_ap(apdev[0], params)
-
- dev[0].scan_for_bss(bssid, freq="2412")
- run_req_hs20_icon(dev, bssid)
-
-def run_req_hs20_icon(dev, bssid):
- # First, fetch two icons from the AP to wpa_supplicant
-
- if "OK" not in dev[0].request("REQ_HS20_ICON " + bssid + " w1fi_logo"):
- raise Exception("REQ_HS20_ICON failed")
- ev = dev[0].wait_event(["RX-HS20-ICON"], timeout=5)
- if ev is None:
- raise Exception("Timeout on RX-HS20-ICON (1)")
-
- if "OK" not in dev[0].request("REQ_HS20_ICON " + bssid + " test_logo"):
- raise Exception("REQ_HS20_ICON failed")
- ev = dev[0].wait_event(["RX-HS20-ICON"], timeout=5)
- if ev is None:
- raise Exception("Timeout on RX-HS20-ICON (2)")
-
- # Then, fetch the icons from wpa_supplicant for validation
-
- hdr, data1 = get_icon(dev[0], bssid, "w1fi_logo")
- hdr, data2 = get_icon(dev[0], bssid, "test_logo")
-
- with open('w1fi_logo.png', 'rb') as f:
- data = f.read()
- if data1 != data:
- raise Exception("Unexpected icon data (1)")
-
- with open('auth_serv/sha512-server.pem', 'rb') as f:
- data = f.read()
- if data2 != data:
- raise Exception("Unexpected icon data (2)")
-
- # Finally, delete the icons from wpa_supplicant
-
- if "OK" not in dev[0].request("DEL_HS20_ICON " + bssid + " w1fi_logo"):
- raise Exception("DEL_HS20_ICON failed")
- if "OK" not in dev[0].request("DEL_HS20_ICON " + bssid + " test_logo"):
- raise Exception("DEL_HS20_ICON failed")
-
-def test_ap_hs20_req_operator_icon(dev, apdev):
- """Hotspot 2.0 operator icons"""
- bssid = apdev[0]['bssid']
- params = hs20_ap_params()
- params['hs20_icon'] = ["128:80:zxx:image/png:w1fi_logo:w1fi_logo.png",
- "500:300:fi:image/png:test_logo:auth_serv/sha512-server.pem"]
- params['operator_icon'] = ["w1fi_logo", "unknown_logo", "test_logo"]
- hostapd.add_ap(apdev[0], params)
-
- value = struct.pack('<HH', 128, 80) + b"zxx"
- value += struct.pack('B', 9) + b"image/png"
- value += struct.pack('B', 9) + b"w1fi_logo"
-
- value += struct.pack('<HH', 500, 300) + b"fi\0"
- value += struct.pack('B', 9) + b"image/png"
- value += struct.pack('B', 9) + b"test_logo"
-
- dev[0].scan_for_bss(bssid, freq="2412")
-
- if "OK" not in dev[0].request("ANQP_GET " + bssid + " hs20:12"):
- raise Exception("ANQP_GET command failed")
-
- ev = dev[0].wait_event(["GAS-QUERY-START"], timeout=5)
- if ev is None:
- raise Exception("GAS query start timed out")
-
- ev = dev[0].wait_event(["GAS-QUERY-DONE"], timeout=10)
- if ev is None:
- raise Exception("GAS query timed out")
-
- ev = dev[0].wait_event(["RX-HS20-ANQP"], timeout=1)
- if ev is None or "Operator Icon Metadata" not in ev:
- raise Exception("Did not receive Operator Icon Metadata")
-
- ev = dev[0].wait_event(["ANQP-QUERY-DONE"], timeout=10)
- if ev is None:
- raise Exception("ANQP-QUERY-DONE event not seen")
- if "result=SUCCESS" not in ev:
- raise Exception("Unexpected result: " + ev)
-
- bss = dev[0].get_bss(bssid)
- if "hs20_operator_icon_metadata" not in bss:
- raise Exception("hs20_operator_icon_metadata missing from BSS entry")
- if bss["hs20_operator_icon_metadata"] != binascii.hexlify(value).decode():
- raise Exception("Unexpected hs20_operator_icon_metadata value: " +
- bss["hs20_operator_icon_metadata"])
-
- run_req_hs20_icon(dev, bssid)
-
-def test_ap_hs20_req_hs20_icon_oom(dev, apdev):
- """Hotspot 2.0 icon fetch OOM with REQ_HS20_ICON"""
- bssid = apdev[0]['bssid']
- params = hs20_ap_params()
- params['hs20_icon'] = ["128:80:zxx:image/png:w1fi_logo:w1fi_logo.png",
- "128:80:zxx:image/png:test_logo:auth_serv/sha512-server.pem"]
- params['osu_ssid'] = '"HS 2.0 OSU open"'
- params['osu_method_list'] = "1"
- params['osu_friendly_name'] = ["eng:Test OSU", "fin:Testi-OSU"]
- params['osu_icon'] = ["w1fi_logo", "w1fi_logo2"]
- params['osu_service_desc'] = ["eng:Example services",
- "fin:Esimerkkipalveluja"]
- params['osu_server_uri'] = "https://example.com/osu/"
- hostapd.add_ap(apdev[0], params)
-
- dev[0].scan_for_bss(bssid, freq="2412")
-
- if "FAIL" not in dev[0].request("REQ_HS20_ICON 11:22:33:44:55:66 w1fi_logo"):
- raise Exception("REQ_HS20_ICON succeeded with unknown BSSID")
-
- with alloc_fail(dev[0], 1, "hs20_build_anqp_req;hs20_anqp_send_req"):
- if "FAIL" not in dev[0].request("REQ_HS20_ICON " + bssid + " w1fi_logo"):
- raise Exception("REQ_HS20_ICON succeeded during OOM")
-
- with alloc_fail(dev[0], 1, "gas_query_req;hs20_anqp_send_req"):
- if "FAIL" not in dev[0].request("REQ_HS20_ICON " + bssid + " w1fi_logo"):
- raise Exception("REQ_HS20_ICON succeeded during OOM")
-
- with alloc_fail(dev[0], 1, "=hs20_anqp_send_req"):
- if "FAIL" not in dev[0].request("REQ_HS20_ICON " + bssid + " w1fi_logo"):
- raise Exception("REQ_HS20_ICON succeeded during OOM")
- with alloc_fail(dev[0], 2, "=hs20_anqp_send_req"):
- if "FAIL" not in dev[0].request("REQ_HS20_ICON " + bssid + " w1fi_logo"):
- raise Exception("REQ_HS20_ICON succeeded during OOM")
-
- if "OK" not in dev[0].request("REQ_HS20_ICON " + bssid + " w1fi_logo"):
- raise Exception("REQ_HS20_ICON failed")
- ev = dev[0].wait_event(["RX-HS20-ICON"], timeout=5)
- if ev is None:
- raise Exception("Timeout on RX-HS20-ICON (1)")
-
- with alloc_fail(dev[0], 1, "hs20_get_icon"):
- if "FAIL" not in dev[0].request("GET_HS20_ICON " + bssid + "w1fi_logo 0 100"):
- raise Exception("GET_HS20_ICON succeeded during OOM")
-
- if "OK" not in dev[0].request("DEL_HS20_ICON " + bssid + " w1fi_logo"):
- raise Exception("DEL_HS20_ICON failed")
-
- with alloc_fail(dev[0], 1, "=hs20_process_icon_binary_file"):
- if "OK" not in dev[0].request("REQ_HS20_ICON " + bssid + " w1fi_logo"):
- raise Exception("REQ_HS20_ICON failed")
- wait_fail_trigger(dev[0], "GET_ALLOC_FAIL")
-
-def test_ap_hs20_req_hs20_icon_parallel(dev, apdev):
- """Hotspot 2.0 OSU provider and multi-icon parallel fetch with REQ_HS20_ICON"""
- bssid = apdev[0]['bssid']
- params = hs20_ap_params()
- params['hs20_icon'] = ["128:80:zxx:image/png:w1fi_logo:w1fi_logo.png",
- "128:80:zxx:image/png:test_logo:auth_serv/sha512-server.pem"]
- params['osu_ssid'] = '"HS 2.0 OSU open"'
- params['osu_method_list'] = "1"
- params['osu_friendly_name'] = ["eng:Test OSU", "fin:Testi-OSU"]
- params['osu_icon'] = ["w1fi_logo", "w1fi_logo2"]
- params['osu_service_desc'] = ["eng:Example services",
- "fin:Esimerkkipalveluja"]
- params['osu_server_uri'] = "https://example.com/osu/"
- hostapd.add_ap(apdev[0], params)
-
- dev[0].scan_for_bss(bssid, freq="2412")
-
- # First, fetch two icons from the AP to wpa_supplicant
-
- if "OK" not in dev[0].request("REQ_HS20_ICON " + bssid + " w1fi_logo"):
- raise Exception("REQ_HS20_ICON failed")
-
- if "OK" not in dev[0].request("REQ_HS20_ICON " + bssid + " test_logo"):
- raise Exception("REQ_HS20_ICON failed")
- ev = dev[0].wait_event(["RX-HS20-ICON"], timeout=5)
- if ev is None:
- raise Exception("Timeout on RX-HS20-ICON (1)")
- ev = dev[0].wait_event(["RX-HS20-ICON"], timeout=5)
- if ev is None:
- raise Exception("Timeout on RX-HS20-ICON (2)")
-
- # Then, fetch the icons from wpa_supplicant for validation
-
- hdr, data1 = get_icon(dev[0], bssid, "w1fi_logo")
- hdr, data2 = get_icon(dev[0], bssid, "test_logo")
-
- with open('w1fi_logo.png', 'rb') as f:
- data = f.read()
- if data1 != data:
- raise Exception("Unexpected icon data (1)")
-
- with open('auth_serv/sha512-server.pem', 'rb') as f:
- data = f.read()
- if data2 != data:
- raise Exception("Unexpected icon data (2)")
-
- # Finally, delete the icons from wpa_supplicant
-
- if "OK" not in dev[0].request("DEL_HS20_ICON " + bssid + " w1fi_logo"):
- raise Exception("DEL_HS20_ICON failed")
- if "OK" not in dev[0].request("DEL_HS20_ICON " + bssid + " test_logo"):
- raise Exception("DEL_HS20_ICON failed")
-
-def test_ap_hs20_fetch_osu_stop(dev, apdev):
- """Hotspot 2.0 OSU provider fetch stopped"""
- bssid = apdev[0]['bssid']
- params = hs20_ap_params()
- params['hs20_icon'] = "128:80:zxx:image/png:w1fi_logo:w1fi_logo.png"
- params['osu_ssid'] = '"HS 2.0 OSU open"'
- params['osu_method_list'] = "1"
- params['osu_friendly_name'] = ["eng:Test OSU", "fin:Testi-OSU"]
- params['osu_icon'] = "w1fi_logo"
- params['osu_service_desc'] = ["eng:Example services",
- "fin:Esimerkkipalveluja"]
- params['osu_server_uri'] = "https://example.com/osu/"
- hapd = hostapd.add_ap(apdev[0], params)
-
- dev[0].hs20_enable()
- dir = "/tmp/osu-fetch"
- if os.path.isdir(dir):
- files = [f for f in os.listdir(dir) if f.startswith("osu-")]
- for f in files:
- os.remove(dir + "/" + f)
- else:
- try:
- os.makedirs(dir)
- except:
- pass
- try:
- dev[0].request("SET osu_dir " + dir)
- dev[0].request("SCAN freq=2412-2462")
- ev = dev[0].wait_event(["CTRL-EVENT-SCAN-STARTED"], timeout=10)
- if ev is None:
- raise Exception("Scan did not start")
- if "FAIL" not in dev[0].request("FETCH_OSU"):
- raise Exception("FETCH_OSU accepted while scanning")
- ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 10)
- if ev is None:
- raise Exception("Scan timed out")
- hapd.set("ext_mgmt_frame_handling", "1")
- dev[0].request("FETCH_ANQP")
- if "FAIL" not in dev[0].request("FETCH_OSU"):
- raise Exception("FETCH_OSU accepted while in FETCH_ANQP")
- dev[0].request("STOP_FETCH_ANQP")
- dev[0].wait_event(["GAS-QUERY-DONE"], timeout=5)
- dev[0].dump_monitor()
- hapd.dump_monitor()
- dev[0].request("INTERWORKING_SELECT freq=2412")
- for i in range(5):
- msg = hapd.mgmt_rx()
- if msg['subtype'] == 13:
- break
- if "FAIL" not in dev[0].request("FETCH_OSU"):
- raise Exception("FETCH_OSU accepted while in INTERWORKING_SELECT")
- ev = dev[0].wait_event(["INTERWORKING-AP", "INTERWORKING-NO-MATCH"],
- timeout=15)
- if ev is None:
- raise Exception("Network selection timed out")
-
- dev[0].dump_monitor()
- if "OK" not in dev[0].request("FETCH_OSU"):
- raise Exception("FETCH_OSU failed")
- dev[0].request("CANCEL_FETCH_OSU")
-
- for i in range(15):
- time.sleep(0.5)
- if dev[0].get_driver_status_field("scan_state") == "SCAN_COMPLETED":
- break
-
- dev[0].dump_monitor()
- if "OK" not in dev[0].request("FETCH_OSU"):
- raise Exception("FETCH_OSU failed")
- if "FAIL" not in dev[0].request("FETCH_OSU"):
- raise Exception("FETCH_OSU accepted while in FETCH_OSU")
- ev = dev[0].wait_event(["GAS-QUERY-START"], 10)
- if ev is None:
- raise Exception("GAS timed out")
- if "FAIL" not in dev[0].request("FETCH_OSU"):
- raise Exception("FETCH_OSU accepted while in FETCH_OSU")
- dev[0].request("CANCEL_FETCH_OSU")
- ev = dev[0].wait_event(["GAS-QUERY-DONE"], 10)
- if ev is None:
- raise Exception("GAS event timed out after CANCEL_FETCH_OSU")
- finally:
- files = [f for f in os.listdir(dir) if f.startswith("osu-")]
- for f in files:
- os.remove(dir + "/" + f)
- os.rmdir(dir)
-
-def test_ap_hs20_fetch_osu_proto(dev, apdev):
- """Hotspot 2.0 OSU provider and protocol testing"""
- bssid = apdev[0]['bssid']
- params = hs20_ap_params()
- hapd = hostapd.add_ap(apdev[0], params)
-
- dev[0].hs20_enable()
- dir = "/tmp/osu-fetch"
- if os.path.isdir(dir):
- files = [f for f in os.listdir(dir) if f.startswith("osu-")]
- for f in files:
- os.remove(dir + "/" + f)
- else:
- try:
- os.makedirs(dir)
- except:
- pass
-
- tests = [("Empty provider list (no OSU SSID field)", b''),
- ("HS 2.0: Not enough room for OSU SSID",
- binascii.unhexlify('01')),
- ("HS 2.0: Invalid OSU SSID Length 33",
- binascii.unhexlify('21') + 33*b'A'),
- ("HS 2.0: Not enough room for Number of OSU Providers",
- binascii.unhexlify('0130')),
- ("Truncated OSU Provider",
- binascii.unhexlify('013001020000')),
- ("HS 2.0: Ignored 5 bytes of extra data after OSU Providers",
- binascii.unhexlify('0130001122334455')),
- ("HS 2.0: Not enough room for OSU Friendly Name Length",
- binascii.unhexlify('013001000000')),
- ("HS 2.0: Not enough room for OSU Friendly Name Duples",
- build_prov('0100')),
- ("Invalid OSU Friendly Name", build_prov('040000000000')),
- ("Invalid OSU Friendly Name(2)", build_prov('040004000000')),
- ("HS 2.0: Not enough room for OSU Server URI length",
- build_prov('0000')),
- ("HS 2.0: Not enough room for OSU Server URI",
- build_prov('000001')),
- ("HS 2.0: Not enough room for OSU Method list length",
- build_prov('000000')),
- ("HS 2.0: Not enough room for OSU Method list",
- build_prov('00000001')),
- ("HS 2.0: Not enough room for Icons Available Length",
- build_prov('00000000')),
- ("HS 2.0: Not enough room for Icons Available Length(2)",
- build_prov('00000001ff00')),
- ("HS 2.0: Not enough room for Icons Available",
- build_prov('000000000100')),
- ("HS 2.0: Invalid Icon Metadata",
- build_prov('00000000010000')),
- ("HS 2.0: Not room for Icon Type",
- build_prov('000000000900111122223333330200')),
- ("HS 2.0: Not room for Icon Filename length",
- build_prov('000000000900111122223333330100')),
- ("HS 2.0: Not room for Icon Filename",
- build_prov('000000000900111122223333330001')),
- ("HS 2.0: Not enough room for OSU_NAI",
- build_prov('000000000000')),
- ("HS 2.0: Not enough room for OSU_NAI(2)",
- build_prov('00000000000001')),
- ("HS 2.0: Not enough room for OSU Service Description Length",
- build_prov('00000000000000')),
- ("HS 2.0: Not enough room for OSU Service Description Length(2)",
- build_prov('0000000000000000')),
- ("HS 2.0: Not enough room for OSU Service Description Duples",
- build_prov('000000000000000100')),
- ("Invalid OSU Service Description",
- build_prov('00000000000000040000000000')),
- ("Invalid OSU Service Description(2)",
- build_prov('00000000000000040004000000'))]
-
- try:
- dev[0].request("SET osu_dir " + dir)
- run_fetch_osu_icon_failure(hapd, dev, bssid)
- for note, prov in tests:
- run_fetch_osu(hapd, dev, bssid, note, prov)
- finally:
- files = [f for f in os.listdir(dir) if f.startswith("osu-")]
- for f in files:
- os.remove(dir + "/" + f)
- os.rmdir(dir)
-
-def test_ap_hs20_fetch_osu_invalid_dir(dev, apdev):
- """Hotspot 2.0 OSU provider and invalid directory"""
- bssid = apdev[0]['bssid']
- params = hs20_ap_params()
- params['hs20_icon'] = "128:80:zxx:image/png:w1fi_logo:w1fi_logo.png"
- params['osu_ssid'] = '"HS 2.0 OSU open"'
- params['osu_method_list'] = "1"
- params['osu_friendly_name'] = ["eng:Test OSU", "fin:Testi-OSU"]
- params['osu_icon'] = "w1fi_logo"
- params['osu_service_desc'] = ["eng:Example services",
- "fin:Esimerkkipalveluja"]
- params['osu_server_uri'] = "https://example.com/osu/"
- hostapd.add_ap(apdev[0], params)
-
- dev[0].hs20_enable()
- dir = "/tmp/osu-fetch-no-such-dir"
- dev[0].scan_for_bss(bssid, freq="2412")
- dev[0].request("SET osu_dir " + dir)
- dev[0].request("FETCH_OSU no-scan")
- ev = dev[0].wait_event(["Could not write OSU provider information"],
- timeout=15)
- if ev is None:
- raise Exception("Timeout on OSU fetch")
-
-def test_ap_hs20_fetch_osu_oom(dev, apdev):
- """Hotspot 2.0 OSU provider and OOM"""
- bssid = apdev[0]['bssid']
- params = hs20_ap_params()
- params['hs20_icon'] = "128:80:zxx:image/png:w1fi_logo:w1fi_logo.png"
- params['osu_ssid'] = '"HS 2.0 OSU open"'
- params['osu_method_list'] = "1"
- params['osu_friendly_name'] = ["eng:Test OSU", "fin:Testi-OSU"]
- params['osu_icon'] = "w1fi_logo"
- params['osu_service_desc'] = ["eng:Example services",
- "fin:Esimerkkipalveluja"]
- params['osu_server_uri'] = "https://example.com/osu/"
- hostapd.add_ap(apdev[0], params)
-
- dev[0].hs20_enable()
- dir = "/tmp/osu-fetch"
- if os.path.isdir(dir):
- files = [f for f in os.listdir(dir) if f.startswith("osu-")]
- for f in files:
- os.remove(dir + "/" + f)
- else:
- try:
- os.makedirs(dir)
- except:
- pass
- dev[0].scan_for_bss(bssid, freq="2412")
- try:
- dev[0].request("SET osu_dir " + dir)
- with alloc_fail(dev[0], 1, "=hs20_osu_add_prov"):
- dev[0].request("FETCH_OSU no-scan")
- ev = dev[0].wait_event(["OSU provider fetch completed"], timeout=30)
- if ev is None:
- raise Exception("Timeout on OSU fetch")
- with alloc_fail(dev[0], 1, "hs20_anqp_send_req;hs20_next_osu_icon"):
- dev[0].request("FETCH_OSU no-scan")
- ev = dev[0].wait_event(["OSU provider fetch completed"], timeout=30)
- if ev is None:
- raise Exception("Timeout on OSU fetch")
- finally:
- files = [f for f in os.listdir(dir) if f.startswith("osu-")]
- for f in files:
- os.remove(dir + "/" + f)
- os.rmdir(dir)
-
-def build_prov(prov):
- data = binascii.unhexlify(prov)
- return binascii.unhexlify('013001') + struct.pack('<H', len(data)) + data
-
-def handle_osu_prov_fetch(hapd, dev, prov):
- # GAS/ANQP query for OSU Providers List
- query = gas_rx(hapd)
- gas = parse_gas(query['payload'])
- dialog_token = gas['dialog_token']
-
- resp = action_response(query)
- osu_prov = struct.pack('<HH', 0xdddd, len(prov) + 6) + binascii.unhexlify('506f9a110800') + prov
- data = struct.pack('<H', len(osu_prov)) + osu_prov
- resp['payload'] = anqp_initial_resp(dialog_token, 0) + data
- send_gas_resp(hapd, resp)
-
- ev = dev[0].wait_event(["RX-HS20-ANQP"], timeout=5)
- if ev is None:
- raise Exception("ANQP query response for OSU Providers not received")
- if "OSU Providers list" not in ev:
- raise Exception("ANQP query response for OSU Providers not received(2)")
- ev = dev[0].wait_event(["ANQP-QUERY-DONE"], timeout=5)
- if ev is None:
- raise Exception("ANQP query for OSU Providers list not completed")
-
-def start_osu_fetch(hapd, dev, bssid, note):
- hapd.set("ext_mgmt_frame_handling", "0")
- dev[0].request("BSS_FLUSH 0")
- dev[0].scan_for_bss(bssid, freq="2412")
- hapd.set("ext_mgmt_frame_handling", "1")
- dev[0].dump_monitor()
- dev[0].request("NOTE " + note)
- dev[0].request("FETCH_OSU no-scan")
-
-def wait_osu_fetch_completed(dev):
- ev = dev[0].wait_event(["OSU provider fetch completed"], timeout=5)
- if ev is None:
- raise Exception("Timeout on OSU fetch")
-
-def run_fetch_osu_icon_failure(hapd, dev, bssid):
- start_osu_fetch(hapd, dev, bssid, "Icon fetch failure")
-
- prov = binascii.unhexlify('01ff' + '01' + '800019000b656e6754657374204f53550c66696e54657374692d4f53551868747470733a2f2f6578616d706c652e636f6d2f6f73752f01011b00800050007a787809696d6167652f706e6709773166695f6c6f676f002a0013656e674578616d706c652073657276696365731566696e4573696d65726b6b6970616c76656c756a61')
- handle_osu_prov_fetch(hapd, dev, prov)
-
- # GAS/ANQP query for icon
- query = gas_rx(hapd)
- gas = parse_gas(query['payload'])
- dialog_token = gas['dialog_token']
-
- resp = action_response(query)
- # Unexpected Advertisement Protocol in response
- adv_proto = struct.pack('8B', 108, 6, 127, 0xdd, 0x00, 0x11, 0x22, 0x33)
- data = struct.pack('<H', 0)
- resp['payload'] = struct.pack('<BBBHH', ACTION_CATEG_PUBLIC,
- GAS_INITIAL_RESPONSE,
- gas['dialog_token'], 0, 0) + adv_proto + data
- send_gas_resp(hapd, resp)
-
- ev = dev[0].wait_event(["ANQP-QUERY-DONE"], timeout=5)
- if ev is None:
- raise Exception("ANQP query for icon not completed")
-
- wait_osu_fetch_completed(dev)
-
-def run_fetch_osu(hapd, dev, bssid, note, prov):
- start_osu_fetch(hapd, dev, bssid, note)
- handle_osu_prov_fetch(hapd, dev, prov)
- wait_osu_fetch_completed(dev)
-
def test_ap_hs20_ft(dev, apdev):
"""Hotspot 2.0 connection with FT"""
check_eap_capa(dev[0], "MSCHAPV2")
if key_mgmt != "WPA2/IEEE 802.1X/EAP":
raise Exception("Unexpected key_mgmt: " + key_mgmt)
-def test_ap_hs20_remediation_sql(dev, apdev, params):
- """Hotspot 2.0 connection and remediation required using SQLite for user DB"""
- check_eap_capa(dev[0], "MSCHAPV2")
- try:
- import sqlite3
- except ImportError:
- raise HwsimSkip("No sqlite3 module available")
- dbfile = params['prefix'] + ".eap-user.db"
- try:
- os.remove(dbfile)
- except:
- pass
- con = sqlite3.connect(dbfile)
- with con:
- cur = con.cursor()
- cur.execute("CREATE TABLE users(identity TEXT PRIMARY KEY, methods TEXT, password TEXT, remediation TEXT, phase2 INTEGER)")
- cur.execute("CREATE TABLE wildcards(identity TEXT PRIMARY KEY, methods TEXT)")
- cur.execute("INSERT INTO users(identity,methods,password,phase2,remediation) VALUES ('user-mschapv2','TTLS-MSCHAPV2','password',1,'user')")
- cur.execute("INSERT INTO wildcards(identity,methods) VALUES ('','TTLS,TLS')")
- cur.execute("CREATE TABLE authlog(timestamp TEXT, session TEXT, nas_ip TEXT, username TEXT, note TEXT)")
-
- try:
- params = {"ssid": "as", "beacon_int": "2000",
- "radius_server_clients": "auth_serv/radius_clients.conf",
- "radius_server_auth_port": '18128',
- "eap_server": "1",
- "eap_user_file": "sqlite:" + dbfile,
- "ca_cert": "auth_serv/ca.pem",
- "server_cert": "auth_serv/server.pem",
- "private_key": "auth_serv/server.key",
- "subscr_remediation_url": "https://example.org/",
- "subscr_remediation_method": "1"}
- hostapd.add_ap(apdev[1], params)
-
- bssid = apdev[0]['bssid']
- params = hs20_ap_params()
- params['auth_server_port'] = "18128"
- hostapd.add_ap(apdev[0], params)
-
- dev[0].request("SET pmf 1")
- dev[0].hs20_enable()
- id = dev[0].add_cred_values({'realm': "example.com",
- 'username': "user-mschapv2",
- 'password': "password",
- 'ca_cert': "auth_serv/ca.pem"})
- interworking_select(dev[0], bssid, freq="2412")
- interworking_connect(dev[0], bssid, "TTLS")
- ev = dev[0].wait_event(["HS20-SUBSCRIPTION-REMEDIATION"], timeout=5)
- if ev is None:
- raise Exception("Timeout on subscription remediation notice")
- if " 1 https://example.org/" not in ev:
- raise Exception("Unexpected subscription remediation event contents")
-
- with con:
- cur = con.cursor()
- cur.execute("SELECT * from authlog")
- rows = cur.fetchall()
- if len(rows) < 1:
- raise Exception("No authlog entries")
-
- finally:
- os.remove(dbfile)
- dev[0].request("SET pmf 0")
-
-def test_ap_hs20_sim_provisioning(dev, apdev, params):
- """Hotspot 2.0 AAA server behavior for SIM provisioning"""
- check_eap_capa(dev[0], "SIM")
- try:
- import sqlite3
- except ImportError:
- raise HwsimSkip("No sqlite3 module available")
- dbfile = params['prefix'] + ".eap-user.db"
- try:
- os.remove(dbfile)
- except:
- pass
- con = sqlite3.connect(dbfile)
- with con:
- cur = con.cursor()
- cur.execute("CREATE TABLE users(identity TEXT PRIMARY KEY, methods TEXT, password TEXT, remediation TEXT, phase2 INTEGER, last_msk TEXT)")
- cur.execute("CREATE TABLE wildcards(identity TEXT PRIMARY KEY, methods TEXT)")
- cur.execute("INSERT INTO wildcards(identity,methods) VALUES ('1','SIM')")
- cur.execute("CREATE TABLE authlog(timestamp TEXT, session TEXT, nas_ip TEXT, username TEXT, note TEXT)")
- cur.execute("CREATE TABLE current_sessions(mac_addr TEXT PRIMARY KEY, identity TEXT, start_time TEXT, nas TEXT, hs20_t_c_filtering BOOLEAN, waiting_coa_ack BOOLEAN, coa_ack_received BOOLEAN)")
-
- try:
- params = {"ssid": "as", "beacon_int": "2000",
- "radius_server_clients": "auth_serv/radius_clients.conf",
- "radius_server_auth_port": '18128',
- "eap_server": "1",
- "eap_user_file": "sqlite:" + dbfile,
- "eap_sim_db": "unix:/tmp/hlr_auc_gw.sock",
- "ca_cert": "auth_serv/ca.pem",
- "server_cert": "auth_serv/server.pem",
- "private_key": "auth_serv/server.key",
- "hs20_sim_provisioning_url":
- "https://example.org/?hotspot2dot0-mobile-identifier-hash=",
- "subscr_remediation_method": "1"}
- hostapd.add_ap(apdev[1], params)
-
- bssid = apdev[0]['bssid']
- params = hs20_ap_params()
- params['auth_server_port'] = "18128"
- hostapd.add_ap(apdev[0], params)
-
- dev[0].request("SET pmf 1")
- dev[0].hs20_enable()
- dev[0].connect("test-hs20", proto="RSN", key_mgmt="WPA-EAP", eap="SIM",
- ieee80211w="1",
- identity="1232010000000000",
- password="90dca4eda45b53cf0f12d7c9c3bc6a89:cb9cccc4b9258e6dca4760379fb82581",
- scan_freq="2412", update_identifier="54321")
- ev = dev[0].wait_event(["HS20-SUBSCRIPTION-REMEDIATION"], timeout=0.5)
- if ev is not None:
- raise Exception("Unexpected subscription remediation notice")
- dev[0].request("REMOVE_NETWORK all")
- dev[0].wait_disconnected()
- dev[0].dump_monitor()
-
- dev[0].connect("test-hs20", proto="RSN", key_mgmt="WPA-EAP", eap="SIM",
- ieee80211w="1",
- identity="1232010000000000",
- password="90dca4eda45b53cf0f12d7c9c3bc6a89:cb9cccc4b9258e6dca4760379fb82581",
- scan_freq="2412", update_identifier="0")
- ev = dev[0].wait_event(["HS20-SUBSCRIPTION-REMEDIATION"], timeout=5)
- if ev is None:
- raise Exception("Timeout on subscription remediation notice")
- if " 1 https://example.org/?hotspot2dot0-mobile-identifier-hash=" not in ev:
- raise Exception("Unexpected subscription remediation event contents: " + ev)
- id_hash = ev.split(' ')[2].split('=')[1]
-
- with con:
- cur = con.cursor()
- cur.execute("SELECT * from authlog")
- rows = cur.fetchall()
- if len(rows) < 1:
- raise Exception("No authlog entries")
-
- with con:
- cur = con.cursor()
- cur.execute("SELECT * from sim_provisioning")
- rows = cur.fetchall()
- if len(rows) != 1:
- raise Exeception("Unexpected number of rows in sim_provisioning (%d; expected %d)" % (len(rows), 1))
- logger.info("sim_provisioning: " + str(rows))
- if len(rows[0][0]) != 32:
- raise Exception("Unexpected mobile_identifier_hash length in DB")
- if rows[0][1] != "232010000000000":
- raise Exception("Unexpected IMSI in DB")
- if rows[0][2] != dev[0].own_addr():
- raise Exception("Unexpected MAC address in DB")
- if rows[0][0] != id_hash:
- raise Exception("hotspot2dot0-mobile-identifier-hash mismatch")
- finally:
- dev[0].request("SET pmf 0")
-
def test_ap_hs20_external_selection(dev, apdev):
"""Hotspot 2.0 connection using external network selection and creation"""
check_eap_capa(dev[0], "MSCHAPV2")
con = sqlite3.connect(dbfile)
with con:
cur = con.cursor()
- cur.execute("CREATE TABLE users(identity TEXT PRIMARY KEY, methods TEXT, password TEXT, remediation TEXT, phase2 INTEGER, t_c_timestamp INTEGER)")
+ cur.execute("CREATE TABLE users(identity TEXT PRIMARY KEY, methods TEXT, password TEXT, phase2 INTEGER, t_c_timestamp INTEGER)")
cur.execute("CREATE TABLE wildcards(identity TEXT PRIMARY KEY, methods TEXT)")
cur.execute("INSERT INTO users(identity,methods,password,phase2) VALUES ('user-mschapv2','TTLS-MSCHAPV2','password',1)")
cur.execute("INSERT INTO wildcards(identity,methods) VALUES ('','TTLS,TLS')")