X-Git-Url: http://git.ipfire.org/?a=blobdiff_plain;f=tests%2Fhwsim%2Ftest_ap_hs20.py;h=acc391732730c31a97bc81d735c365eb587600f8;hb=7ab74770e730d7e223007d35bda052b8228afa83;hp=8d9e26e2c5056f11aab502aa67f885a5a103a374;hpb=1169516b6284a8b7d9e262768de2bee6c073dd12;p=thirdparty%2Fhostap.git diff --git a/tests/hwsim/test_ap_hs20.py b/tests/hwsim/test_ap_hs20.py index 8d9e26e2c..acc391732 100644 --- a/tests/hwsim/test_ap_hs20.py +++ b/tests/hwsim/test_ap_hs20.py @@ -17,12 +17,13 @@ import socket import subprocess import hostapd -from utils import HwsimSkip, skip_with_fips, alloc_fail, wait_fail_trigger +from utils import HwsimSkip, skip_with_fips, alloc_fail, fail_test, wait_fail_trigger import hwsim_utils from tshark import run_tshark from wlantest import Wlantest from wpasupplicant import WpaSupplicant from test_ap_eap import check_eap_capa, check_domain_match_full +from test_gas import gas_rx, parse_gas, action_response, anqp_initial_resp, send_gas_resp, ACTION_CATEG_PUBLIC, GAS_INITIAL_RESPONSE def hs20_ap_params(ssid="test-hs20"): params = hostapd.wpa2_params(ssid=ssid) @@ -97,7 +98,7 @@ def check_sp_type(dev, sp_type): if type is None: raise Exception("sp_type not available") if type != sp_type: - raise Exception("sp_type did not indicate home network") + raise Exception("sp_type did not indicate %s network" % sp_type) def hlr_auc_gw_available(): if not os.path.exists("/tmp/hlr_auc_gw.sock"): @@ -185,6 +186,9 @@ def test_ap_anqp_sharing(dev, apdev): dev[0].scan_for_bss(bssid2, freq="2412") interworking_select(dev[0], None, "home", freq="2412") dev[0].dump_monitor() + state = dev[0].get_status_field('wpa_state') + if state != "DISCONNECTED": + raise Exception("Unexpected wpa_state after INTERWORKING_SELECT: " + state) logger.debug("BSS entries:\n" + dev[0].request("BSS RANGE=ALL")) res1 = dev[0].get_bss(bssid) @@ -212,6 +216,85 @@ def test_ap_anqp_sharing(dev, apdev): if res1['anqp_nai_realm'] == res2['anqp_nai_realm']: raise Exception("ANQP results were not unshared") +def test_ap_anqp_domain_id(dev, apdev): + """ANQP Domain ID""" + check_eap_capa(dev[0], "MSCHAPV2") + dev[0].flush_scan_cache() + + bssid = apdev[0]['bssid'] + params = hs20_ap_params() + params['hessid'] = bssid + params['anqp_domain_id'] = '1234' + hostapd.add_ap(apdev[0], params) + + bssid2 = apdev[1]['bssid'] + params = hs20_ap_params() + params['hessid'] = bssid + params['anqp_domain_id'] = '1234' + hostapd.add_ap(apdev[1], params) + + dev[0].hs20_enable() + id = dev[0].add_cred_values({ 'realm': "example.com", 'username': "test", + 'password': "secret", + 'domain': "example.com" }) + dev[0].scan_for_bss(bssid, freq="2412") + dev[0].scan_for_bss(bssid2, freq="2412") + interworking_select(dev[0], None, "home", freq="2412") + +def test_ap_anqp_no_sharing_diff_ess(dev, apdev): + """ANQP no sharing between ESSs""" + check_eap_capa(dev[0], "MSCHAPV2") + dev[0].flush_scan_cache() + + bssid = apdev[0]['bssid'] + params = hs20_ap_params() + params['hessid'] = bssid + hostapd.add_ap(apdev[0], params) + + bssid2 = apdev[1]['bssid'] + params = hs20_ap_params(ssid="test-hs20-another") + params['hessid'] = bssid + params['nai_realm'] = [ "0,example.com,13[5:6],21[2:4][5:7]" ] + hostapd.add_ap(apdev[1], params) + + dev[0].hs20_enable() + id = dev[0].add_cred_values({ 'realm': "example.com", 'username': "test", + 'password': "secret", + 'domain': "example.com" }) + logger.info("Normal network selection with shared ANQP results") + dev[0].scan_for_bss(bssid, freq="2412") + dev[0].scan_for_bss(bssid2, freq="2412") + interworking_select(dev[0], None, "home", freq="2412") + +def test_ap_anqp_no_sharing_missing_info(dev, apdev): + """ANQP no sharing due to missing information""" + check_eap_capa(dev[0], "MSCHAPV2") + dev[0].flush_scan_cache() + + bssid = apdev[0]['bssid'] + params = hs20_ap_params() + params['hessid'] = bssid + del params['roaming_consortium'] + del params['domain_name'] + del params['anqp_3gpp_cell_net'] + del params['nai_realm'] + hostapd.add_ap(apdev[0], params) + + bssid2 = apdev[1]['bssid'] + params = hs20_ap_params() + params['hessid'] = bssid + params['nai_realm'] = [ "0,example.com,13[5:6],21[2:4][5:7]" ] + hostapd.add_ap(apdev[1], params) + + dev[0].hs20_enable() + id = dev[0].add_cred_values({ 'realm': "example.com", 'username': "test", + 'password': "secret", + 'domain': "example.com" }) + logger.info("Normal network selection with shared ANQP results") + dev[0].scan_for_bss(bssid, freq="2412") + dev[0].scan_for_bss(bssid2, freq="2412") + interworking_select(dev[0], None, "home", freq="2412") + def test_ap_anqp_sharing_oom(dev, apdev): """ANQP sharing within ESS and explicit unshare OOM""" check_eap_capa(dev[0], "MSCHAPV2") @@ -348,6 +431,13 @@ def _test_ap_interworking_scan_filtering(dev, apdev): wt = Wlantest() wt.flush() + # Make sure wlantest has seen both BSSs to avoid issues in trying to clear + # counters for non-existing BSS. + dev[0].scan_for_bss(bssid, freq="2412") + dev[0].scan_for_bss(bssid2, freq="2412") + wt.clear_bss_counters(bssid) + wt.clear_bss_counters(bssid2) + logger.info("Check probe request filtering based on HESSID") dev[0].request("SET hessid " + bssid2) @@ -575,7 +665,7 @@ def test_ap_hs20_username(dev, apdev): status = dev[0].get_status() if status['pairwise_cipher'] != "CCMP": raise Exception("Unexpected pairwise cipher") - if status['hs20'] != "2": + if status['hs20'] != "3": raise Exception("Unexpected HS 2.0 support indication") dev[1].connect("test-hs20", key_mgmt="WPA-EAP", eap="TTLS", @@ -608,7 +698,7 @@ def test_ap_hs20_connect_api(dev, apdev): status = wpas.get_status() if status['pairwise_cipher'] != "CCMP": raise Exception("Unexpected pairwise cipher") - if status['hs20'] != "2": + if status['hs20'] != "3": raise Exception("Unexpected HS 2.0 support indication") def test_ap_hs20_auto_interworking(dev, apdev): @@ -633,7 +723,7 @@ def test_ap_hs20_auto_interworking(dev, apdev): status = dev[0].get_status() if status['pairwise_cipher'] != "CCMP": raise Exception("Unexpected pairwise cipher") - if status['hs20'] != "2": + if status['hs20'] != "3": raise Exception("Unexpected HS 2.0 support indication") @remote_compatible @@ -699,10 +789,12 @@ def test_ap_hs20_auto_interworking_no_cred_match(dev, apdev): raise Exception("Scan timed out") logger.info("Scan completed") -def eap_test(dev, ap, eap_params, method, user): +def eap_test(dev, ap, eap_params, method, user, release=0): bssid = ap['bssid'] params = hs20_ap_params() params['nai_realm'] = [ "0,example.com," + eap_params ] + if release > 0: + params['hs20_release'] = str(release) hostapd.add_ap(ap, params) dev.hs20_enable() @@ -761,6 +853,11 @@ def test_ap_hs20_eap_ttls_mschap(dev, apdev): skip_with_fips(dev[0]) eap_test(dev[0], apdev[0], "21[2:3]", "TTLS", "mschap user") +def test_ap_hs20_eap_ttls_default(dev, apdev): + """Hotspot 2.0 connection with TTLS/default""" + skip_with_fips(dev[0]) + eap_test(dev[0], apdev[0], "21", "TTLS", "hs20-test") + def test_ap_hs20_eap_ttls_eap_mschapv2(dev, apdev): """Hotspot 2.0 connection with TTLS/EAP-MSCHAPv2""" check_eap_capa(dev[0], "MSCHAPV2") @@ -915,6 +1012,48 @@ def test_ap_hs20_roaming_consortium(dev, apdev): raise Exception("Timeout on already-connected event") dev[0].remove_cred(id) +def test_ap_hs20_roaming_consortiums_match(dev, apdev): + """Hotspot 2.0 connection based on roaming_consortiums match""" + bssid = apdev[0]['bssid'] + params = hs20_ap_params() + params['hessid'] = bssid + hostapd.add_ap(apdev[0], params) + + dev[0].hs20_enable() + tests = [ ("112233", "112233"), + ("ffffff,1020304050,eeeeee", "1020304050") ] + for consortium,selected in tests: + id = dev[0].add_cred_values({ 'username': "user", + 'password': "password", + 'domain': "my.home.example.com", + 'ca_cert': "auth_serv/ca.pem", + 'roaming_consortiums': consortium, + 'eap': "PEAP" }) + interworking_select(dev[0], bssid, "roaming", freq="2412") + interworking_connect(dev[0], bssid, "PEAP") + check_sp_type(dev[0], "roaming") + network_id = dev[0].get_status_field("id") + sel = dev[0].get_network(network_id, "roaming_consortium_selection") + if sel != selected: + raise Exception("Unexpected roaming_consortium_selection value: " + + sel) + dev[0].request("INTERWORKING_SELECT auto freq=2412") + ev = dev[0].wait_event(["INTERWORKING-ALREADY-CONNECTED"], timeout=15) + if ev is None: + raise Exception("Timeout on already-connected event") + dev[0].remove_cred(id) + +def test_ap_hs20_max_roaming_consortiums(dev, apdev): + """Maximum number of cred roaming_consortiums""" + id = dev[0].add_cred() + consortium = (36*",ffffff")[1:] + if "OK" not in dev[0].request('SET_CRED %d roaming_consortiums "%s"' % (id, consortium)): + raise Exception("Maximum number of consortium OIs rejected") + consortium = (37*",ffffff")[1:] + if "FAIL" not in dev[0].request('SET_CRED %d roaming_consortiums "%s"' % (id, consortium)): + raise Exception("Over maximum number of consortium OIs accepted") + dev[0].remove_cred(id) + def test_ap_hs20_roaming_consortium_invalid(dev, apdev): """Hotspot 2.0 connection and invalid roaming consortium ANQP-element""" bssid = apdev[0]['bssid'] @@ -934,6 +1073,233 @@ def test_ap_hs20_roaming_consortium_invalid(dev, apdev): 'eap': "PEAP" }) interworking_select(dev[0], bssid, "home", freq="2412", no_match=True) +def test_ap_hs20_roaming_consortium_element(dev, apdev): + """Hotspot 2.0 connection and invalid roaming consortium element""" + bssid = apdev[0]['bssid'] + params = hs20_ap_params() + params['hessid'] = bssid + del params['roaming_consortium'] + params['vendor_elements'] = '6f00' + hapd = hostapd.add_ap(apdev[0], params) + + dev[0].hs20_enable() + dev[0].scan_for_bss(bssid, freq="2412") + id = dev[0].add_cred_values({ 'username': "user", + 'password': "password", + 'domain': "example.com", + 'ca_cert': "auth_serv/ca.pem", + 'roaming_consortium': "112233", + 'eap': "PEAP" }) + interworking_select(dev[0], bssid, freq="2412", no_match=True) + + hapd.set('vendor_elements', '6f020001') + if "OK" not in hapd.request("UPDATE_BEACON"): + raise Exception("UPDATE_BEACON failed") + dev[0].request("BSS_FLUSH 0") + dev[0].scan_for_bss(bssid, freq="2412", force_scan=True) + interworking_select(dev[0], bssid, freq="2412", no_match=True) + +def test_ap_hs20_roaming_consortium_constraints(dev, apdev): + """Hotspot 2.0 connection and roaming consortium constraints""" + bssid = apdev[0]['bssid'] + params = hs20_ap_params() + params['hessid'] = bssid + params['bss_load_test'] = "12:200:20000" + hostapd.add_ap(apdev[0], params) + + dev[0].hs20_enable() + + vals = { 'username': "user", + 'password': "password", + 'domain': "example.com", + 'ca_cert': "auth_serv/ca.pem", + 'roaming_consortium': "fedcba", + 'eap': "TTLS" } + vals2 = vals.copy() + vals2['required_roaming_consortium'] = "223344" + id = dev[0].add_cred_values(vals2) + interworking_select(dev[0], bssid, "home", freq="2412", no_match=True) + dev[0].remove_cred(id) + + vals2 = vals.copy() + vals2['min_dl_bandwidth_home'] = "65500" + id = dev[0].add_cred_values(vals2) + dev[0].request("INTERWORKING_SELECT freq=2412") + ev = dev[0].wait_event(["INTERWORKING-AP"], timeout=15) + if ev is None: + raise Exception("No AP found") + if "below_min_backhaul=1" not in ev: + raise Exception("below_min_backhaul not reported") + dev[0].remove_cred(id) + + vals2 = vals.copy() + vals2['max_bss_load'] = "100" + id = dev[0].add_cred_values(vals2) + dev[0].request("INTERWORKING_SELECT freq=2412") + ev = dev[0].wait_event(["INTERWORKING-AP"], timeout=15) + if ev is None: + raise Exception("No AP found") + if "over_max_bss_load=1" not in ev: + raise Exception("over_max_bss_load not reported") + dev[0].remove_cred(id) + + vals2 = vals.copy() + vals2['req_conn_capab'] = "6:1234" + vals2['domain'] = 'example.org' + id = dev[0].add_cred_values(vals2) + + dev[0].request("INTERWORKING_SELECT freq=2412") + ev = dev[0].wait_event(["INTERWORKING-AP"], timeout=15) + if ev is None: + raise Exception("No AP found") + if "conn_capab_missing=1" not in ev: + raise Exception("conn_capab_missing not reported") + dev[0].remove_cred(id) + + values = default_cred() + values['roaming_consortium'] = "fedcba" + id3 = dev[0].add_cred_values(values) + + vals2 = vals.copy() + vals2['roaming_consortium'] = "fedcba" + vals2['priority'] = "2" + id = dev[0].add_cred_values(vals2) + + values = default_cred() + values['roaming_consortium'] = "fedcba" + id2 = dev[0].add_cred_values(values) + + dev[0].request("INTERWORKING_SELECT freq=2412") + ev = dev[0].wait_event(["INTERWORKING-AP"], timeout=15) + if ev is None: + raise Exception("No AP found") + dev[0].remove_cred(id) + dev[0].remove_cred(id2) + dev[0].remove_cred(id3) + +def test_ap_hs20_3gpp_constraints(dev, apdev): + """Hotspot 2.0 connection and 3GPP credential constraints""" + bssid = apdev[0]['bssid'] + params = hs20_ap_params() + params['hessid'] = bssid + params['anqp_3gpp_cell_net'] = "555,444" + params['domain_name'] = "wlan.mnc444.mcc555.3gppnetwork.org" + params['bss_load_test'] = "12:200:20000" + hapd = hostapd.add_ap(apdev[0], params) + + dev[0].hs20_enable() + + vals = { 'imsi': "555444-333222111", + 'eap': "SIM", + 'milenage': "5122250214c33e723a5dd523fc145fc0:981d464c7c52eb6e5036234984ad0bcf:000000000123" } + vals2 = vals.copy() + vals2['required_roaming_consortium'] = "223344" + id = dev[0].add_cred_values(vals2) + interworking_select(dev[0], bssid, "home", freq="2412", no_match=True) + dev[0].remove_cred(id) + + vals2 = vals.copy() + vals2['min_dl_bandwidth_home'] = "65500" + id = dev[0].add_cred_values(vals2) + dev[0].request("INTERWORKING_SELECT freq=2412") + ev = dev[0].wait_event(["INTERWORKING-AP"], timeout=15) + if ev is None: + raise Exception("No AP found") + if "below_min_backhaul=1" not in ev: + raise Exception("below_min_backhaul not reported") + dev[0].remove_cred(id) + + vals2 = vals.copy() + vals2['max_bss_load'] = "100" + id = dev[0].add_cred_values(vals2) + dev[0].request("INTERWORKING_SELECT freq=2412") + ev = dev[0].wait_event(["INTERWORKING-AP"], timeout=15) + if ev is None: + raise Exception("No AP found") + if "over_max_bss_load=1" not in ev: + raise Exception("over_max_bss_load not reported") + dev[0].remove_cred(id) + + values = default_cred() + values['roaming_consortium'] = "fedcba" + id3 = dev[0].add_cred_values(values) + + vals2 = vals.copy() + vals2['roaming_consortium'] = "fedcba" + vals2['priority'] = "2" + id = dev[0].add_cred_values(vals2) + + values = default_cred() + values['roaming_consortium'] = "fedcba" + id2 = dev[0].add_cred_values(values) + + dev[0].request("INTERWORKING_SELECT freq=2412") + ev = dev[0].wait_event(["INTERWORKING-AP"], timeout=15) + if ev is None: + raise Exception("No AP found") + dev[0].remove_cred(id) + dev[0].remove_cred(id2) + dev[0].remove_cred(id3) + + hapd.disable() + params = hs20_ap_params() + params['hessid'] = bssid + params['anqp_3gpp_cell_net'] = "555,444" + params['bss_load_test'] = "12:200:20000" + hapd = hostapd.add_ap(apdev[0], params) + vals2 = vals.copy() + vals2['req_conn_capab'] = "6:1234" + id = dev[0].add_cred_values(vals2) + dev[0].request("INTERWORKING_SELECT freq=2412") + ev = dev[0].wait_event(["INTERWORKING-AP"], timeout=15) + if ev is None: + raise Exception("No AP found") + if "conn_capab_missing=1" not in ev: + raise Exception("conn_capab_missing not reported") + dev[0].remove_cred(id) + +def test_ap_hs20_connect_no_full_match(dev, apdev): + """Hotspot 2.0 connection and no full match""" + bssid = apdev[0]['bssid'] + params = hs20_ap_params() + params['hessid'] = bssid + params['anqp_3gpp_cell_net'] = "555,444" + hostapd.add_ap(apdev[0], params) + + dev[0].hs20_enable() + + vals = { 'username': "user", + 'password': "password", + 'domain': "example.com", + 'ca_cert': "auth_serv/ca.pem", + 'roaming_consortium': "fedcba", + 'eap': "TTLS", + 'min_dl_bandwidth_home': "65500" } + id = dev[0].add_cred_values(vals) + dev[0].request("INTERWORKING_SELECT freq=2412") + ev = dev[0].wait_event(["INTERWORKING-AP"], timeout=15) + if ev is None: + raise Exception("No AP found") + if "below_min_backhaul=1" not in ev: + raise Exception("below_min_backhaul not reported") + interworking_connect(dev[0], bssid, "TTLS") + dev[0].remove_cred(id) + dev[0].wait_disconnected() + + vals = { 'imsi': "555444-333222111", 'eap': "SIM", + 'milenage': "5122250214c33e723a5dd523fc145fc0:981d464c7c52eb6e5036234984ad0bcf:000000000123", + 'min_dl_bandwidth_roaming': "65500" } + id = dev[0].add_cred_values(vals) + dev[0].request("INTERWORKING_SELECT freq=2412") + ev = dev[0].wait_event(["INTERWORKING-AP"], timeout=15) + if ev is None: + raise Exception("No AP found") + if "below_min_backhaul=1" not in ev: + raise Exception("below_min_backhaul not reported") + interworking_connect(dev[0], bssid, "SIM") + dev[0].remove_cred(id) + dev[0].wait_disconnected() + def test_ap_hs20_username_roaming(dev, apdev): """Hotspot 2.0 connection in username/password credential (roaming)""" check_eap_capa(dev[0], "MSCHAPV2") @@ -1805,6 +2171,20 @@ def test_ap_hs20_req_conn_capab(dev, apdev): if bssid2 in ev and "conn_capab_missing=1" in ev: raise Exception("Protocol connection capability not reported correctly") +def test_ap_hs20_req_conn_capab2(dev, apdev): + """Hotspot 2.0 network selection with req_conn_capab (not present)""" + check_eap_capa(dev[0], "MSCHAPV2") + bssid = apdev[0]['bssid'] + params = hs20_ap_params() + del params['hs20_conn_capab'] + hostapd.add_ap(apdev[0], params) + + dev[0].hs20_enable() + dev[0].scan_for_bss(bssid, freq="2412") + values = conn_capab_cred(domain="example.org", req_conn_capab="6:1234") + id = dev[0].add_cred_values(values) + check_conn_capab_selection(dev[0], "roaming", False) + def test_ap_hs20_req_conn_capab_and_roaming_partner_preference(dev, apdev): """Hotspot 2.0 and req_conn_capab with roaming partner preference""" check_eap_capa(dev[0], "MSCHAPV2") @@ -2133,6 +2513,28 @@ def _test_ap_hs20_deauth_req_from_radius(dev, apdev): raise Exception("Unexpected deauth imminent contents") dev[0].wait_disconnected(timeout=3) +def test_ap_hs20_deauth_req_without_pmf(dev, apdev): + """Hotspot 2.0 connection and deauthentication request without PMF""" + check_eap_capa(dev[0], "MSCHAPV2") + dev[0].request("SET pmf 0") + eap_test(dev[0], apdev[0], "21[3:26]", "TTLS", "user", release=1) + dev[0].dump_monitor() + id = int(dev[0].get_status_field("id")) + dev[0].set_network(id, "ieee80211w", "0") + dev[0].request("DISCONNECT") + dev[0].wait_disconnected() + dev[0].select_network(id, freq=2412) + dev[0].wait_connected() + addr = dev[0].own_addr() + hapd = hostapd.Hostapd(apdev[0]['ifname']) + hapd.request("HS20_DEAUTH_REQ " + addr + " 1 120 http://example.com/") + ev = dev[0].wait_event(["HS20-DEAUTH-IMMINENT-NOTICE"], timeout=0.2) + if ev is not None: + raise Exception("Deauth imminent notice without PMF accepted") + with alloc_fail(hapd, 1, "wpabuf_alloc;hostapd_ctrl_iface_hs20_deauth_req"): + if "FAIL" not in hapd.request("HS20_DEAUTH_REQ " + addr + " 1 120 http://example.com/"): + raise Exception("HS20_DEAUTH_REQ accepted during OOM") + def test_ap_hs20_remediation_required(dev, apdev): """Hotspot 2.0 connection and remediation required from RADIUS""" check_eap_capa(dev[0], "MSCHAPV2") @@ -2201,6 +2603,8 @@ def _test_ap_hs20_remediation_required_ctrl(dev, apdev): raise Exception("Unexpected HS20_WNM_NOTIF success") if "FAIL" not in hapd.request("HS20_WNM_NOTIF " + addr + " https://12345678923456789842345678456783456712345678923456789842345678456783456712345678923456789842345678456783456712345678923456789842345678456783456712345678923456789842345678456783456712345678923456789842345678456783456712345678923456789842345678456783456712345678927.very.long.example.com/"): raise Exception("Unexpected HS20_WNM_NOTIF success") + if "OK" not in hapd.request("HS20_WNM_NOTIF " + addr + " "): + raise Exception("HS20_WNM_NOTIF failed with empty URL") def test_ap_hs20_session_info(dev, apdev): """Hotspot 2.0 connection and session information from RADIUS""" @@ -2250,7 +2654,7 @@ def test_ap_hs20_osen(dev, apdev): scan_freq="2412", wait_connect=False) dev[0].flush_scan_cache() dev[0].connect("osen", proto="OSEN", key_mgmt="OSEN", pairwise="CCMP", - group="GTK_NOT_USED", + group="GTK_NOT_USED CCMP", eap="WFA-UNAUTH-TLS", identity="osen@example.com", ca_cert="auth_serv/ca.pem", scan_freq="2412") @@ -2266,12 +2670,47 @@ def test_ap_hs20_osen(dev, apdev): wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5') wpas.interface_add("wlan5", drv_params="force_connect_cmd=1") wpas.connect("osen", proto="OSEN", key_mgmt="OSEN", pairwise="CCMP", - group="GTK_NOT_USED", + group="GTK_NOT_USED CCMP", eap="WFA-UNAUTH-TLS", identity="osen@example.com", ca_cert="auth_serv/ca.pem", scan_freq="2412") wpas.request("DISCONNECT") +def test_ap_hs20_osen_single_ssid(dev, apdev): + """Hotspot 2.0 OSEN-single-SSID connection""" + bssid = apdev[0]['bssid'] + params = hs20_ap_params() + params['wpa_key_mgmt'] = "WPA-EAP OSEN" + params['hessid'] = bssid + hapd = hostapd.add_ap(apdev[0], params) + + # RSN-OSEN (for OSU) + dev[0].connect("test-hs20", proto="OSEN", key_mgmt="OSEN", pairwise="CCMP", + group="CCMP GTK_NOT_USED", + eap="WFA-UNAUTH-TLS", identity="osen@example.com", + ca_cert="auth_serv/ca.pem", ieee80211w='2', + scan_freq="2412") + # RSN-EAP (for data connection) + dev[1].connect("test-hs20", key_mgmt="WPA-EAP", eap="TTLS", + identity="hs20-test", password="password", + ca_cert="auth_serv/ca.pem", phase2="auth=MSCHAPV2", + pairwise="CCMP", group="CCMP", + ieee80211w='2', scan_freq="2412") + + res = dev[0].get_bss(apdev[0]['bssid'])['flags'] + if "[WPA2-EAP+OSEN-CCMP]" not in res: + raise Exception("OSEN not reported in BSS") + if "[WEP]" in res: + raise Exception("WEP reported in BSS") + res = dev[0].request("SCAN_RESULTS") + if "[WPA2-EAP+OSEN-CCMP]" not in res: + raise Exception("OSEN not reported in SCAN_RESULTS") + + hwsim_utils.test_connectivity(dev[1], hapd) + hwsim_utils.test_connectivity(dev[0], hapd, broadcast=False) + hwsim_utils.test_connectivity(dev[0], hapd, timeout=1, + success_expected=False) + def test_ap_hs20_network_preference(dev, apdev): """Hotspot 2.0 network selection with preferred home network""" check_eap_capa(dev[0], "MSCHAPV2") @@ -2566,6 +3005,12 @@ def test_ap_hs20_fetch_osu(dev, apdev): raise Exception("GET_HS20_ICON 5..15 failed") if "FAIL" not in dev[2].request("GET_HS20_ICON " + bssid + " w1fi_logo 100000 10"): raise Exception("Unexpected success of GET_HS20_ICON with too large offset") + if "FAIL" not in dev[2].request("GET_HS20_ICON " + bssid + " no_such_logo 0 10"): + raise Exception("GET_HS20_ICON for not existing icon succeeded") + if "FAIL" not in dev[2].request("GET_HS20_ICON " + bssid + " w1fi_logo 0 3070"): + raise Exception("GET_HS20_ICON with too many output bytes to fit the buffer succeeded") + if "FAIL" not in dev[2].request("GET_HS20_ICON " + bssid + " w1fi_logo 0 0"): + raise Exception("GET_HS20_ICON 0..0 succeeded") icon = "" pos = 0 while True: @@ -2576,7 +3021,7 @@ def test_ap_hs20_fetch_osu(dev, apdev): break icon += base64.b64decode(res) pos += 1000 - hex = binascii.hexlify(icon) + hex = binascii.hexlify(icon).decode() if not hex.startswith("0009696d6167652f706e677d1d"): raise Exception("Unexpected beacon binary header: " + hex) with open('w1fi_logo.png', 'r') as f: @@ -2606,8 +3051,209 @@ def test_ap_hs20_fetch_osu(dev, apdev): if "FAIL" not in dev[i].request("DEL_HS20_ICON "): raise Exception("DEL_HS20_ICON accepted when no icons left") -def get_icon(dev, bssid, iconname): - icon = "" +def test_ap_hs20_fetch_osu_no_info(dev, apdev): + """Hotspot 2.0 OSU provider and no AP with info""" + bssid = apdev[0]['bssid'] + params = hs20_ap_params() + hostapd.add_ap(apdev[0], params) + + dev[0].hs20_enable() + dir = "/tmp/osu-fetch" + if os.path.isdir(dir): + files = [ f for f in os.listdir(dir) if f.startswith("osu-") ] + for f in files: + os.remove(dir + "/" + f) + else: + try: + os.makedirs(dir) + except: + pass + dev[0].scan_for_bss(bssid, freq="2412") + try: + dev[0].request("SET osu_dir " + dir) + dev[0].request("FETCH_OSU") + ev = dev[0].wait_event(["OSU provider fetch completed"], timeout=30) + if ev is None: + raise Exception("Timeout on OSU fetch") + finally: + files = [ f for f in os.listdir(dir) if f.startswith("osu-") ] + for f in files: + os.remove(dir + "/" + f) + os.rmdir(dir) + +def test_ap_hs20_fetch_osu_no_icon(dev, apdev): + """Hotspot 2.0 OSU provider and no icon found""" + bssid = apdev[0]['bssid'] + params = hs20_ap_params() + params['hs20_icon'] = "128:80:zxx:image/png:w1fi_logo:w1fi_logo-no-file.png" + params['osu_ssid'] = '"HS 2.0 OSU open"' + params['osu_method_list'] = "1" + params['osu_friendly_name'] = [ "eng:Test OSU", "fin:Testi-OSU" ] + params['osu_icon'] = "w1fi_logo" + params['osu_service_desc'] = [ "eng:Example services", "fin:Esimerkkipalveluja" ] + params['osu_server_uri'] = "https://example.com/osu/" + hostapd.add_ap(apdev[0], params) + + dev[0].hs20_enable() + dir = "/tmp/osu-fetch" + if os.path.isdir(dir): + files = [ f for f in os.listdir(dir) if f.startswith("osu-") ] + for f in files: + os.remove(dir + "/" + f) + else: + try: + os.makedirs(dir) + except: + pass + dev[0].scan_for_bss(bssid, freq="2412") + try: + dev[0].request("SET osu_dir " + dir) + dev[0].request("FETCH_OSU") + ev = dev[0].wait_event(["OSU provider fetch completed"], timeout=30) + if ev is None: + raise Exception("Timeout on OSU fetch") + finally: + files = [ f for f in os.listdir(dir) if f.startswith("osu-") ] + for f in files: + os.remove(dir + "/" + f) + os.rmdir(dir) + +def test_ap_hs20_fetch_osu_single_ssid(dev, apdev): + """Hotspot 2.0 OSU provider and single SSID""" + bssid = apdev[0]['bssid'] + params = hs20_ap_params() + params['hs20_icon'] = "128:80:zxx:image/png:w1fi_logo:w1fi_logo-no-file.png" + params['osu_ssid'] = '"HS 2.0 OSU open"' + params['osu_method_list'] = "1" + params['osu_friendly_name'] = [ "eng:Test OSU", "fin:Testi-OSU" ] + params['osu_nai2'] = "osen@example.com" + params['osu_icon'] = "w1fi_logo" + params['osu_service_desc'] = [ "eng:Example services", "fin:Esimerkkipalveluja" ] + params['osu_server_uri'] = "https://example.com/osu/" + params['wpa_key_mgmt'] = "WPA-EAP OSEN" + hostapd.add_ap(apdev[0], params) + + dev[0].hs20_enable() + dir = "/tmp/osu-fetch" + if os.path.isdir(dir): + files = [ f for f in os.listdir(dir) if f.startswith("osu-") ] + for f in files: + os.remove(dir + "/" + f) + else: + try: + os.makedirs(dir) + except: + pass + dev[0].scan_for_bss(bssid, freq="2412") + try: + dev[0].request("SET osu_dir " + dir) + dev[0].request("FETCH_OSU") + ev = dev[0].wait_event(["OSU provider fetch completed"], timeout=30) + if ev is None: + raise Exception("Timeout on OSU fetch") + osu_ssid = False + osu_ssid2 = False + osu_nai = False + osu_nai2 = False + with open(os.path.join(dir, "osu-providers.txt"), "r") as f: + for l in f.readlines(): + logger.info(l.strip()) + if l.strip() == "osu_ssid=HS 2.0 OSU open": + osu_ssid = True + if l.strip() == "osu_ssid2=test-hs20": + osu_ssid2 = True + if l.strip().startswith("osu_nai="): + osu_nai = True + if l.strip() == "osu_nai2=osen@example.com": + osu_nai2 = True + if not osu_ssid: + raise Exception("osu_ssid not reported") + if not osu_ssid2: + raise Exception("osu_ssid2 not reported") + if osu_nai: + raise Exception("osu_nai reported unexpectedly") + if not osu_nai2: + raise Exception("osu_nai2 not reported") + finally: + files = [ f for f in os.listdir(dir) if f.startswith("osu-") ] + for f in files: + os.remove(dir + "/" + f) + os.rmdir(dir) + +def test_ap_hs20_fetch_osu_single_ssid2(dev, apdev): + """Hotspot 2.0 OSU provider and single SSID (two OSU providers)""" + bssid = apdev[0]['bssid'] + params = hs20_ap_params() + params['hs20_icon'] = "128:80:zxx:image/png:w1fi_logo:w1fi_logo-no-file.png" + params['osu_ssid'] = '"HS 2.0 OSU open"' + params['osu_method_list'] = "1" + params['osu_friendly_name'] = [ "eng:Test OSU", "fin:Testi-OSU" ] + params['osu_nai2'] = "osen@example.com" + params['osu_icon'] = "w1fi_logo" + params['osu_service_desc'] = [ "eng:Example services", "fin:Esimerkkipalveluja" ] + params['osu_server_uri'] = "https://example.com/osu/" + params['wpa_key_mgmt'] = "WPA-EAP OSEN" + hapd = hostapd.add_ap(apdev[0], params, no_enable=True) + + hapd.set('osu_server_uri', 'https://another.example.com/osu/') + hapd.set('osu_method_list', "1") + hapd.set('osu_nai2', "osen@another.example.com") + hapd.enable() + + dev[0].hs20_enable() + dir = "/tmp/osu-fetch" + if os.path.isdir(dir): + files = [ f for f in os.listdir(dir) if f.startswith("osu-") ] + for f in files: + os.remove(dir + "/" + f) + else: + try: + os.makedirs(dir) + except: + pass + dev[0].scan_for_bss(bssid, freq="2412") + try: + dev[0].request("SET osu_dir " + dir) + dev[0].request("FETCH_OSU") + ev = dev[0].wait_event(["OSU provider fetch completed"], timeout=30) + if ev is None: + raise Exception("Timeout on OSU fetch") + osu_ssid = False + osu_ssid2 = False + osu_nai = False + osu_nai2 = False + osu_nai2b = False + with open(os.path.join(dir, "osu-providers.txt"), "r") as f: + for l in f.readlines(): + logger.info(l.strip()) + if l.strip() == "osu_ssid=HS 2.0 OSU open": + osu_ssid = True + if l.strip() == "osu_ssid2=test-hs20": + osu_ssid2 = True + if l.strip().startswith("osu_nai="): + osu_nai = True + if l.strip() == "osu_nai2=osen@example.com": + osu_nai2 = True + if l.strip() == "osu_nai2=osen@another.example.com": + osu_nai2b = True + if not osu_ssid: + raise Exception("osu_ssid not reported") + if not osu_ssid2: + raise Exception("osu_ssid2 not reported") + if osu_nai: + raise Exception("osu_nai reported unexpectedly") + if not osu_nai2: + raise Exception("osu_nai2 not reported") + if not osu_nai2b: + raise Exception("osu_nai2b not reported") + finally: + files = [ f for f in os.listdir(dir) if f.startswith("osu-") ] + for f in files: + os.remove(dir + "/" + f) + os.rmdir(dir) + +def get_icon(dev, bssid, iconname): + icon = "" pos = 0 while True: if pos > 100000: @@ -2636,7 +3282,9 @@ def test_ap_hs20_req_hs20_icon(dev, apdev): hostapd.add_ap(apdev[0], params) dev[0].scan_for_bss(bssid, freq="2412") + run_req_hs20_icon(dev, bssid) +def run_req_hs20_icon(dev, bssid): # First, fetch two icons from the AP to wpa_supplicant if "OK" not in dev[0].request("REQ_HS20_ICON " + bssid + " w1fi_logo"): @@ -2673,6 +3321,108 @@ def test_ap_hs20_req_hs20_icon(dev, apdev): if "OK" not in dev[0].request("DEL_HS20_ICON " + bssid + " test_logo"): raise Exception("DEL_HS20_ICON failed") +def test_ap_hs20_req_operator_icon(dev, apdev): + """Hotspot 2.0 operator icons""" + bssid = apdev[0]['bssid'] + params = hs20_ap_params() + params['hs20_icon'] = [ "128:80:zxx:image/png:w1fi_logo:w1fi_logo.png", + "500:300:fi:image/png:test_logo:auth_serv/sha512-server.pem" ] + params['operator_icon'] = [ "w1fi_logo", "unknown_logo", "test_logo" ] + hostapd.add_ap(apdev[0], params) + + value = struct.pack('BBBBL3BB', 2, 1, 6, 0, 12345, 0, 0, 0, 0) payload += _ciaddr + _yiaddr + _siaddr + _giaddr + _chaddr + 192*'\x00' - udp = struct.pack('>HHHH', 67, 68, 8 + len(payload), 0) + payload + if udp_checksum: + pseudohdr = _ip_src + _ip_dst + struct.pack('>BBH', 0, 17, + 8 + len(payload)) + udphdr = struct.pack('>HHHH', 67, 68, 8 + len(payload), 0) + checksum, = struct.unpack('>H', ip_checksum(pseudohdr + udphdr + payload)) + else: + checksum = 0 + udp = struct.pack('>HHHH', 67, 68, 8 + len(payload), checksum) + payload if force_tot_len: tot_len = force_tot_len @@ -3463,12 +4559,12 @@ def send_arp(dev, dst_ll="ff:ff:ff:ff:ff:ff", src_ll=None, opcode=1, pkt = build_arp(dst_ll=dst_ll, src_ll=src_ll, opcode=opcode, sender_mac=sender_mac, sender_ip=sender_ip, target_mac=target_mac, target_ip=target_ip) - if "OK" not in dev.request(cmd + binascii.hexlify(pkt)): + if "OK" not in dev.request(cmd + binascii.hexlify(pkt).decode()): raise Exception("DATA_TEST_FRAME failed") def get_permanent_neighbors(ifname): cmd = subprocess.Popen(['ip', 'nei'], stdout=subprocess.PIPE) - res = cmd.stdout.read() + res = cmd.stdout.read().decode() cmd.stdout.close() return [ line for line in res.splitlines() if "PERMANENT" in line and ifname in line ] @@ -3554,26 +4650,12 @@ def _test_proxyarp_open(dev, apdev, params, ebtables=False): if ebtables: for chain in [ 'FORWARD', 'OUTPUT' ]: - subprocess.call(['ebtables', '-A', chain, '-p', 'ARP', - '-d', 'Broadcast', '-o', apdev[0]['ifname'], - '-j', 'DROP']) - subprocess.call(['ebtables', '-A', chain, '-d', 'Multicast', - '-p', 'IPv6', '--ip6-protocol', 'ipv6-icmp', - '--ip6-icmp-type', 'neighbor-solicitation', - '-o', apdev[0]['ifname'], '-j', 'DROP']) - subprocess.call(['ebtables', '-A', chain, '-d', 'Multicast', - '-p', 'IPv6', '--ip6-protocol', 'ipv6-icmp', - '--ip6-icmp-type', 'neighbor-advertisement', - '-o', apdev[0]['ifname'], '-j', 'DROP']) - subprocess.call(['ebtables', '-A', chain, - '-p', 'IPv6', '--ip6-protocol', 'ipv6-icmp', - '--ip6-icmp-type', 'router-solicitation', - '-o', apdev[0]['ifname'], '-j', 'DROP']) - # Multicast Listener Report Message - subprocess.call(['ebtables', '-A', chain, '-d', 'Multicast', - '-p', 'IPv6', '--ip6-protocol', 'ipv6-icmp', - '--ip6-icmp-type', '143', - '-o', apdev[0]['ifname'], '-j', 'DROP']) + try: + subprocess.call(['ebtables', '-A', chain, '-p', 'ARP', + '-d', 'Broadcast', '-o', apdev[0]['ifname'], + '-j', 'DROP']) + except: + raise HwsimSkip("No ebtables available") time.sleep(0.5) cmd = {} @@ -3596,13 +4678,13 @@ def _test_proxyarp_open(dev, apdev, params, ebtables=False): time.sleep(0.1) brcmd = subprocess.Popen(['brctl', 'show'], stdout=subprocess.PIPE) - res = brcmd.stdout.read() + res = brcmd.stdout.read().decode() brcmd.stdout.close() logger.info("Bridge setup: " + res) brcmd = subprocess.Popen(['brctl', 'showstp', 'ap-br0'], stdout=subprocess.PIPE) - res = brcmd.stdout.read() + res = brcmd.stdout.read().decode() brcmd.stdout.close() logger.info("Bridge showstp: " + res) @@ -3610,56 +4692,31 @@ def _test_proxyarp_open(dev, apdev, params, ebtables=False): addr1 = dev[1].p2p_interface_addr() addr2 = dev[2].p2p_interface_addr() - src_ll_opt0 = "\x01\x01" + binascii.unhexlify(addr0.replace(':','')) - src_ll_opt1 = "\x01\x01" + binascii.unhexlify(addr1.replace(':','')) - - # DAD NS - send_ns(dev[0], ip_src="::", target="aaaa:bbbb:cccc::2") - - send_ns(dev[0], ip_src="aaaa:bbbb:cccc::2", target="aaaa:bbbb:cccc::2") - # test frame without source link-layer address option - send_ns(dev[0], ip_src="aaaa:bbbb:cccc::2", target="aaaa:bbbb:cccc::2", - opt='') - # test frame with bogus option - send_ns(dev[0], ip_src="aaaa:bbbb:cccc::2", target="aaaa:bbbb:cccc::2", - opt="\x70\x01\x01\x02\x03\x04\x05\x05") - # test frame with truncated source link-layer address option - send_ns(dev[0], ip_src="aaaa:bbbb:cccc::2", target="aaaa:bbbb:cccc::2", - opt="\x01\x01\x01\x02\x03\x04") - # test frame with foreign source link-layer address option - send_ns(dev[0], ip_src="aaaa:bbbb:cccc::2", target="aaaa:bbbb:cccc::2", - opt="\x01\x01\x01\x02\x03\x04\x05\x06") - - send_ns(dev[1], ip_src="aaaa:bbbb:dddd::2", target="aaaa:bbbb:dddd::2") - - send_ns(dev[1], ip_src="aaaa:bbbb:eeee::2", target="aaaa:bbbb:eeee::2") - # another copy for additional code coverage - send_ns(dev[1], ip_src="aaaa:bbbb:eeee::2", target="aaaa:bbbb:eeee::2") - pkt = build_dhcp_ack(dst_ll="ff:ff:ff:ff:ff:ff", src_ll=bssid, ip_src="192.168.1.1", ip_dst="255.255.255.255", yiaddr="192.168.1.124", chaddr=addr0) - if "OK" not in hapd.request("DATA_TEST_FRAME ifname=ap-br0 " + binascii.hexlify(pkt)): + if "OK" not in hapd.request("DATA_TEST_FRAME ifname=ap-br0 " + binascii.hexlify(pkt).decode()): raise Exception("DATA_TEST_FRAME failed") # Change address and verify unicast pkt = build_dhcp_ack(dst_ll=addr0, src_ll=bssid, ip_src="192.168.1.1", ip_dst="255.255.255.255", - yiaddr="192.168.1.123", chaddr=addr0) - if "OK" not in hapd.request("DATA_TEST_FRAME ifname=ap-br0 " + binascii.hexlify(pkt)): + yiaddr="192.168.1.123", chaddr=addr0, + udp_checksum=False) + if "OK" not in hapd.request("DATA_TEST_FRAME ifname=ap-br0 " + binascii.hexlify(pkt).decode()): raise Exception("DATA_TEST_FRAME failed") # Not-associated client MAC address pkt = build_dhcp_ack(dst_ll="ff:ff:ff:ff:ff:ff", src_ll=bssid, ip_src="192.168.1.1", ip_dst="255.255.255.255", yiaddr="192.168.1.125", chaddr="22:33:44:55:66:77") - if "OK" not in hapd.request("DATA_TEST_FRAME ifname=ap-br0 " + binascii.hexlify(pkt)): + if "OK" not in hapd.request("DATA_TEST_FRAME ifname=ap-br0 " + binascii.hexlify(pkt).decode()): raise Exception("DATA_TEST_FRAME failed") # No IP address pkt = build_dhcp_ack(dst_ll=addr1, src_ll=bssid, ip_src="192.168.1.1", ip_dst="255.255.255.255", yiaddr="0.0.0.0", chaddr=addr1) - if "OK" not in hapd.request("DATA_TEST_FRAME ifname=ap-br0 " + binascii.hexlify(pkt)): + if "OK" not in hapd.request("DATA_TEST_FRAME ifname=ap-br0 " + binascii.hexlify(pkt).decode()): raise Exception("DATA_TEST_FRAME failed") # Zero subnet mask @@ -3667,7 +4724,7 @@ def _test_proxyarp_open(dev, apdev, params, ebtables=False): ip_src="192.168.1.1", ip_dst="255.255.255.255", yiaddr="192.168.1.126", chaddr=addr1, subnet_mask="0.0.0.0") - if "OK" not in hapd.request("DATA_TEST_FRAME ifname=ap-br0 " + binascii.hexlify(pkt)): + if "OK" not in hapd.request("DATA_TEST_FRAME ifname=ap-br0 " + binascii.hexlify(pkt).decode()): raise Exception("DATA_TEST_FRAME failed") # Truncated option @@ -3675,7 +4732,7 @@ def _test_proxyarp_open(dev, apdev, params, ebtables=False): ip_src="192.168.1.1", ip_dst="255.255.255.255", yiaddr="192.168.1.127", chaddr=addr1, truncated_opt=True) - if "OK" not in hapd.request("DATA_TEST_FRAME ifname=ap-br0 " + binascii.hexlify(pkt)): + if "OK" not in hapd.request("DATA_TEST_FRAME ifname=ap-br0 " + binascii.hexlify(pkt).decode()): raise Exception("DATA_TEST_FRAME failed") # Wrong magic @@ -3683,7 +4740,7 @@ def _test_proxyarp_open(dev, apdev, params, ebtables=False): ip_src="192.168.1.1", ip_dst="255.255.255.255", yiaddr="192.168.1.128", chaddr=addr1, wrong_magic=True) - if "OK" not in hapd.request("DATA_TEST_FRAME ifname=ap-br0 " + binascii.hexlify(pkt)): + if "OK" not in hapd.request("DATA_TEST_FRAME ifname=ap-br0 " + binascii.hexlify(pkt).decode()): raise Exception("DATA_TEST_FRAME failed") # Wrong IPv4 total length @@ -3691,7 +4748,7 @@ def _test_proxyarp_open(dev, apdev, params, ebtables=False): ip_src="192.168.1.1", ip_dst="255.255.255.255", yiaddr="192.168.1.129", chaddr=addr1, force_tot_len=1000) - if "OK" not in hapd.request("DATA_TEST_FRAME ifname=ap-br0 " + binascii.hexlify(pkt)): + if "OK" not in hapd.request("DATA_TEST_FRAME ifname=ap-br0 " + binascii.hexlify(pkt).decode()): raise Exception("DATA_TEST_FRAME failed") # BOOTP @@ -3699,7 +4756,7 @@ def _test_proxyarp_open(dev, apdev, params, ebtables=False): ip_src="192.168.1.1", ip_dst="255.255.255.255", yiaddr="192.168.1.129", chaddr=addr1, no_dhcp=True) - if "OK" not in hapd.request("DATA_TEST_FRAME ifname=ap-br0 " + binascii.hexlify(pkt)): + if "OK" not in hapd.request("DATA_TEST_FRAME ifname=ap-br0 " + binascii.hexlify(pkt).decode()): raise Exception("DATA_TEST_FRAME failed") macs = get_bridge_macs("ap-br0") @@ -3707,14 +4764,8 @@ def _test_proxyarp_open(dev, apdev, params, ebtables=False): matches = get_permanent_neighbors("ap-br0") logger.info("After connect: " + str(matches)) - if len(matches) != 4: + if len(matches) != 1: raise Exception("Unexpected number of neighbor entries after connect") - if 'aaaa:bbbb:cccc::2 dev ap-br0 lladdr 02:00:00:00:00:00 PERMANENT' not in matches: - raise Exception("dev0 addr missing") - if 'aaaa:bbbb:dddd::2 dev ap-br0 lladdr 02:00:00:00:01:00 PERMANENT' not in matches: - raise Exception("dev1 addr(1) missing") - if 'aaaa:bbbb:eeee::2 dev ap-br0 lladdr 02:00:00:00:01:00 PERMANENT' not in matches: - raise Exception("dev1 addr(2) missing") if '192.168.1.123 dev ap-br0 lladdr 02:00:00:00:00:00 PERMANENT' not in matches: raise Exception("dev0 IPv4 addr missing") @@ -3800,41 +4851,9 @@ def _test_proxyarp_open(dev, apdev, params, ebtables=False): send_arp(dev[1], sender_ip="192.168.1.127", target_ip="192.168.1.123") send_arp(dev[0], sender_ip="192.168.1.123", target_ip="192.168.1.127") - time.sleep(0.1) - - send_ns(dev[0], target="aaaa:bbbb:dddd::2", ip_src="aaaa:bbbb:cccc::2") - time.sleep(0.1) - send_ns(dev[1], target="aaaa:bbbb:cccc::2", ip_src="aaaa:bbbb:dddd::2") - time.sleep(0.1) - send_ns(hapd, hapd_bssid=bssid, target="aaaa:bbbb:dddd::2", - ip_src="aaaa:bbbb:ffff::2") - time.sleep(0.1) - send_ns(dev[2], target="aaaa:bbbb:cccc::2", ip_src="aaaa:bbbb:ff00::2") - time.sleep(0.1) - send_ns(dev[2], target="aaaa:bbbb:dddd::2", ip_src="aaaa:bbbb:ff00::2") - time.sleep(0.1) - send_ns(dev[2], target="aaaa:bbbb:eeee::2", ip_src="aaaa:bbbb:ff00::2") - time.sleep(0.1) - - # Try to probe for an already assigned address - send_ns(dev[1], target="aaaa:bbbb:cccc::2", ip_src="::") - time.sleep(0.1) - send_ns(hapd, hapd_bssid=bssid, target="aaaa:bbbb:cccc::2", ip_src="::") - time.sleep(0.1) - send_ns(dev[2], target="aaaa:bbbb:cccc::2", ip_src="::") - time.sleep(0.1) - - # Unsolicited NA - send_na(dev[1], target="aaaa:bbbb:cccc:aeae::3", - ip_src="aaaa:bbbb:cccc:aeae::3", ip_dst="ff02::1") - send_na(hapd, hapd_bssid=bssid, target="aaaa:bbbb:cccc:aeae::4", - ip_src="aaaa:bbbb:cccc:aeae::4", ip_dst="ff02::1") - send_na(dev[2], target="aaaa:bbbb:cccc:aeae::5", - ip_src="aaaa:bbbb:cccc:aeae::5", ip_dst="ff02::1") - try: hwsim_utils.test_connectivity_iface(dev[0], hapd, "ap-br0") - except Exception, e: + except Exception as e: logger.info("test_connectibity_iface failed: " + str(e)) raise HwsimSkip("Assume kernel did not have the required patches for proxyarp") hwsim_utils.test_connectivity_iface(dev[1], hapd, "ap-br0") @@ -3854,7 +4873,7 @@ def _test_proxyarp_open(dev, apdev, params, ebtables=False): if ebtables: cmd = subprocess.Popen(['ebtables', '-L', '--Lc'], stdout=subprocess.PIPE) - res = cmd.stdout.read() + res = cmd.stdout.read().decode() cmd.stdout.close() logger.info("ebtables results:\n" + res) @@ -3923,14 +4942,209 @@ def _test_proxyarp_open(dev, apdev, params, ebtables=False): # bssid, '192.168.1.101' ] not in arp_reply: # raise Exception("br did not get ARP response for 192.168.1.123") - ns = tshark_get_ns(cap_dev0) - logger.info("dev0 seen NS: " + str(ns)) - na = tshark_get_na(cap_dev0) - logger.info("dev0 seen NA: " + str(na)) - - if [ addr0, addr1, 'aaaa:bbbb:dddd::2', 'aaaa:bbbb:cccc::2', +def _test_proxyarp_open_ipv6(dev, apdev, params, ebtables=False): + prefix = "proxyarp_open" + if ebtables: + prefix += "_ebtables" + cap_br = os.path.join(params['logdir'], prefix + ".ap-br0.pcap") + cap_dev0 = os.path.join(params['logdir'], + prefix + ".%s.pcap" % dev[0].ifname) + cap_dev1 = os.path.join(params['logdir'], + prefix + ".%s.pcap" % dev[1].ifname) + cap_dev2 = os.path.join(params['logdir'], + prefix + ".%s.pcap" % dev[2].ifname) + + bssid = apdev[0]['bssid'] + params = { 'ssid': 'open' } + params['proxy_arp'] = '1' + hapd = hostapd.add_ap(apdev[0], params, no_enable=True) + hapd.set("ap_isolate", "1") + hapd.set('bridge', 'ap-br0') + hapd.dump_monitor() + try: + hapd.enable() + except: + # For now, do not report failures due to missing kernel support + raise HwsimSkip("Could not start hostapd - assume proxyarp not supported in kernel version") + ev = hapd.wait_event(["AP-ENABLED", "AP-DISABLED"], timeout=10) + if ev is None: + raise Exception("AP startup timed out") + if "AP-ENABLED" not in ev: + raise Exception("AP startup failed") + + params2 = { 'ssid': 'another' } + hapd2 = hostapd.add_ap(apdev[1], params2, no_enable=True) + hapd2.set('bridge', 'ap-br0') + hapd2.enable() + + subprocess.call(['brctl', 'setfd', 'ap-br0', '0']) + subprocess.call(['ip', 'link', 'set', 'dev', 'ap-br0', 'up']) + + if ebtables: + for chain in [ 'FORWARD', 'OUTPUT' ]: + try: + subprocess.call(['ebtables', '-A', chain, '-d', 'Multicast', + '-p', 'IPv6', '--ip6-protocol', 'ipv6-icmp', + '--ip6-icmp-type', 'neighbor-solicitation', + '-o', apdev[0]['ifname'], '-j', 'DROP']) + subprocess.call(['ebtables', '-A', chain, '-d', 'Multicast', + '-p', 'IPv6', '--ip6-protocol', 'ipv6-icmp', + '--ip6-icmp-type', 'neighbor-advertisement', + '-o', apdev[0]['ifname'], '-j', 'DROP']) + subprocess.call(['ebtables', '-A', chain, + '-p', 'IPv6', '--ip6-protocol', 'ipv6-icmp', + '--ip6-icmp-type', 'router-solicitation', + '-o', apdev[0]['ifname'], '-j', 'DROP']) + # Multicast Listener Report Message + subprocess.call(['ebtables', '-A', chain, '-d', 'Multicast', + '-p', 'IPv6', '--ip6-protocol', 'ipv6-icmp', + '--ip6-icmp-type', '143', + '-o', apdev[0]['ifname'], '-j', 'DROP']) + except: + raise HwsimSkip("No ebtables available") + + time.sleep(0.5) + cmd = {} + cmd[0] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'ap-br0', + '-w', cap_br, '-s', '2000'], + stderr=open('/dev/null', 'w')) + cmd[1] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', dev[0].ifname, + '-w', cap_dev0, '-s', '2000'], + stderr=open('/dev/null', 'w')) + cmd[2] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', dev[1].ifname, + '-w', cap_dev1, '-s', '2000'], + stderr=open('/dev/null', 'w')) + cmd[3] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', dev[2].ifname, + '-w', cap_dev2, '-s', '2000'], + stderr=open('/dev/null', 'w')) + + dev[0].connect("open", key_mgmt="NONE", scan_freq="2412") + dev[1].connect("open", key_mgmt="NONE", scan_freq="2412") + dev[2].connect("another", key_mgmt="NONE", scan_freq="2412") + time.sleep(0.1) + + brcmd = subprocess.Popen(['brctl', 'show'], stdout=subprocess.PIPE) + res = brcmd.stdout.read().decode() + brcmd.stdout.close() + logger.info("Bridge setup: " + res) + + brcmd = subprocess.Popen(['brctl', 'showstp', 'ap-br0'], + stdout=subprocess.PIPE) + res = brcmd.stdout.read().decode() + brcmd.stdout.close() + logger.info("Bridge showstp: " + res) + + addr0 = dev[0].p2p_interface_addr() + addr1 = dev[1].p2p_interface_addr() + addr2 = dev[2].p2p_interface_addr() + + src_ll_opt0 = "\x01\x01" + binascii.unhexlify(addr0.replace(':','')) + src_ll_opt1 = "\x01\x01" + binascii.unhexlify(addr1.replace(':','')) + + # DAD NS + send_ns(dev[0], ip_src="::", target="aaaa:bbbb:cccc::2") + + send_ns(dev[0], ip_src="aaaa:bbbb:cccc::2", target="aaaa:bbbb:cccc::2") + # test frame without source link-layer address option + send_ns(dev[0], ip_src="aaaa:bbbb:cccc::2", target="aaaa:bbbb:cccc::2", + opt='') + # test frame with bogus option + send_ns(dev[0], ip_src="aaaa:bbbb:cccc::2", target="aaaa:bbbb:cccc::2", + opt="\x70\x01\x01\x02\x03\x04\x05\x05") + # test frame with truncated source link-layer address option + send_ns(dev[0], ip_src="aaaa:bbbb:cccc::2", target="aaaa:bbbb:cccc::2", + opt="\x01\x01\x01\x02\x03\x04") + # test frame with foreign source link-layer address option + send_ns(dev[0], ip_src="aaaa:bbbb:cccc::2", target="aaaa:bbbb:cccc::2", + opt="\x01\x01\x01\x02\x03\x04\x05\x06") + + send_ns(dev[1], ip_src="aaaa:bbbb:dddd::2", target="aaaa:bbbb:dddd::2") + + send_ns(dev[1], ip_src="aaaa:bbbb:eeee::2", target="aaaa:bbbb:eeee::2") + # another copy for additional code coverage + send_ns(dev[1], ip_src="aaaa:bbbb:eeee::2", target="aaaa:bbbb:eeee::2") + + macs = get_bridge_macs("ap-br0") + logger.info("After connect (showmacs): " + str(macs)) + + matches = get_permanent_neighbors("ap-br0") + logger.info("After connect: " + str(matches)) + if len(matches) != 3: + raise Exception("Unexpected number of neighbor entries after connect") + if 'aaaa:bbbb:cccc::2 dev ap-br0 lladdr 02:00:00:00:00:00 PERMANENT' not in matches: + raise Exception("dev0 addr missing") + if 'aaaa:bbbb:dddd::2 dev ap-br0 lladdr 02:00:00:00:01:00 PERMANENT' not in matches: + raise Exception("dev1 addr(1) missing") + if 'aaaa:bbbb:eeee::2 dev ap-br0 lladdr 02:00:00:00:01:00 PERMANENT' not in matches: + raise Exception("dev1 addr(2) missing") + + send_ns(dev[0], target="aaaa:bbbb:dddd::2", ip_src="aaaa:bbbb:cccc::2") + time.sleep(0.1) + send_ns(dev[1], target="aaaa:bbbb:cccc::2", ip_src="aaaa:bbbb:dddd::2") + time.sleep(0.1) + send_ns(hapd, hapd_bssid=bssid, target="aaaa:bbbb:dddd::2", + ip_src="aaaa:bbbb:ffff::2") + time.sleep(0.1) + send_ns(dev[2], target="aaaa:bbbb:cccc::2", ip_src="aaaa:bbbb:ff00::2") + time.sleep(0.1) + send_ns(dev[2], target="aaaa:bbbb:dddd::2", ip_src="aaaa:bbbb:ff00::2") + time.sleep(0.1) + send_ns(dev[2], target="aaaa:bbbb:eeee::2", ip_src="aaaa:bbbb:ff00::2") + time.sleep(0.1) + + # Try to probe for an already assigned address + send_ns(dev[1], target="aaaa:bbbb:cccc::2", ip_src="::") + time.sleep(0.1) + send_ns(hapd, hapd_bssid=bssid, target="aaaa:bbbb:cccc::2", ip_src="::") + time.sleep(0.1) + send_ns(dev[2], target="aaaa:bbbb:cccc::2", ip_src="::") + time.sleep(0.1) + + # Unsolicited NA + send_na(dev[1], target="aaaa:bbbb:cccc:aeae::3", + ip_src="aaaa:bbbb:cccc:aeae::3", ip_dst="ff02::1") + send_na(hapd, hapd_bssid=bssid, target="aaaa:bbbb:cccc:aeae::4", + ip_src="aaaa:bbbb:cccc:aeae::4", ip_dst="ff02::1") + send_na(dev[2], target="aaaa:bbbb:cccc:aeae::5", + ip_src="aaaa:bbbb:cccc:aeae::5", ip_dst="ff02::1") + + try: + hwsim_utils.test_connectivity_iface(dev[0], hapd, "ap-br0") + except Exception as e: + logger.info("test_connectibity_iface failed: " + str(e)) + raise HwsimSkip("Assume kernel did not have the required patches for proxyarp") + hwsim_utils.test_connectivity_iface(dev[1], hapd, "ap-br0") + hwsim_utils.test_connectivity(dev[0], dev[1]) + + dev[0].request("DISCONNECT") + dev[1].request("DISCONNECT") + time.sleep(0.5) + for i in range(len(cmd)): + cmd[i].terminate() + macs = get_bridge_macs("ap-br0") + logger.info("After disconnect (showmacs): " + str(macs)) + matches = get_permanent_neighbors("ap-br0") + logger.info("After disconnect: " + str(matches)) + if len(matches) > 0: + raise Exception("Unexpected neighbor entries after disconnect") + if ebtables: + cmd = subprocess.Popen(['ebtables', '-L', '--Lc'], + stdout=subprocess.PIPE) + res = cmd.stdout.read().decode() + cmd.stdout.close() + logger.info("ebtables results:\n" + res) + + ns = tshark_get_ns(cap_dev0) + logger.info("dev0 seen NS: " + str(ns)) + na = tshark_get_na(cap_dev0) + logger.info("dev0 seen NA: " + str(na)) + + if [ addr0, addr1, 'aaaa:bbbb:dddd::2', 'aaaa:bbbb:cccc::2', 'aaaa:bbbb:dddd::2', addr1 ] not in na: - raise Exception("dev0 did not get NA for aaaa:bbbb:dddd::2") + # For now, skip the test instead of reporting the error since the IPv6 + # proxyarp support is not yet in the upstream kernel tree. + #raise Exception("dev0 did not get NA for aaaa:bbbb:dddd::2") + raise HwsimSkip("Assume kernel did not have the required patches for proxyarp (IPv6)") if ebtables: for req in ns: @@ -3977,6 +5191,16 @@ def test_proxyarp_open(dev, apdev, params): subprocess.call(['brctl', 'delbr', 'ap-br0'], stderr=open('/dev/null', 'w')) +def test_proxyarp_open_ipv6(dev, apdev, params): + """ProxyARP with open network (IPv6)""" + try: + _test_proxyarp_open_ipv6(dev, apdev, params) + finally: + subprocess.call(['ip', 'link', 'set', 'dev', 'ap-br0', 'down'], + stderr=open('/dev/null', 'w')) + subprocess.call(['brctl', 'delbr', 'ap-br0'], + stderr=open('/dev/null', 'w')) + def test_proxyarp_open_ebtables(dev, apdev, params): """ProxyARP with open network""" try: @@ -3992,6 +5216,86 @@ def test_proxyarp_open_ebtables(dev, apdev, params): subprocess.call(['brctl', 'delbr', 'ap-br0'], stderr=open('/dev/null', 'w')) +def test_proxyarp_open_ebtables_ipv6(dev, apdev, params): + """ProxyARP with open network (IPv6)""" + try: + _test_proxyarp_open_ipv6(dev, apdev, params, ebtables=True) + finally: + try: + subprocess.call(['ebtables', '-F', 'FORWARD']) + subprocess.call(['ebtables', '-F', 'OUTPUT']) + except: + pass + subprocess.call(['ip', 'link', 'set', 'dev', 'ap-br0', 'down'], + stderr=open('/dev/null', 'w')) + subprocess.call(['brctl', 'delbr', 'ap-br0'], + stderr=open('/dev/null', 'w')) + +def test_proxyarp_errors(dev, apdev, params): + """ProxyARP error cases""" + try: + run_proxyarp_errors(dev, apdev, params) + finally: + subprocess.call(['ip', 'link', 'set', 'dev', 'ap-br0', 'down'], + stderr=open('/dev/null', 'w')) + subprocess.call(['brctl', 'delbr', 'ap-br0'], + stderr=open('/dev/null', 'w')) + +def run_proxyarp_errors(dev, apdev, params): + params = { 'ssid': 'open', + 'proxy_arp': '1', + 'ap_isolate': '1', + 'bridge': 'ap-br0', + 'disable_dgaf': '1' } + hapd = hostapd.add_ap(apdev[0], params, no_enable=True) + try: + hapd.enable() + except: + # For now, do not report failures due to missing kernel support + raise HwsimSkip("Could not start hostapd - assume proxyarp not supported in kernel version") + ev = hapd.wait_event(["AP-ENABLED", "AP-DISABLED"], timeout=10) + if ev is None: + raise Exception("AP startup timed out") + if "AP-ENABLED" not in ev: + raise Exception("AP startup failed") + + hapd.disable() + with alloc_fail(hapd, 1, "l2_packet_init;x_snoop_get_l2_packet;dhcp_snoop_init"): + if "FAIL" not in hapd.request("ENABLE"): + raise Exception("ENABLE accepted unexpectedly") + with alloc_fail(hapd, 1, "l2_packet_init;x_snoop_get_l2_packet;ndisc_snoop_init"): + if "FAIL" not in hapd.request("ENABLE"): + raise Exception("ENABLE accepted unexpectedly") + with fail_test(hapd, 1, "l2_packet_set_packet_filter;x_snoop_get_l2_packet;ndisc_snoop_init"): + if "FAIL" not in hapd.request("ENABLE"): + raise Exception("ENABLE accepted unexpectedly") + with fail_test(hapd, 1, "l2_packet_set_packet_filter;x_snoop_get_l2_packet;dhcp_snoop_init"): + if "FAIL" not in hapd.request("ENABLE"): + raise Exception("ENABLE accepted unexpectedly") + hapd.enable() + + subprocess.call(['brctl', 'setfd', 'ap-br0', '0']) + subprocess.call(['ip', 'link', 'set', 'dev', 'ap-br0', 'up']) + + dev[0].connect("open", key_mgmt="NONE", scan_freq="2412") + addr0 = dev[0].own_addr() + + pkt = build_ra(src_ll=apdev[0]['bssid'], ip_src="aaaa:bbbb:cccc::33", + ip_dst="ff01::1") + with fail_test(hapd, 1, "x_snoop_mcast_to_ucast_convert_send"): + if "OK" not in hapd.request("DATA_TEST_FRAME ifname=ap-br0 " + binascii.hexlify(pkt).decode()): + raise Exception("DATA_TEST_FRAME failed") + wait_fail_trigger(dev[0], "GET_FAIL") + + with alloc_fail(hapd, 1, "sta_ip6addr_add"): + src_ll_opt0 = "\x01\x01" + binascii.unhexlify(addr0.replace(':','')) + pkt = build_ns(src_ll=addr0, ip_src="aaaa:bbbb:cccc::2", + ip_dst="ff02::1:ff00:2", target="aaaa:bbbb:cccc::2", + opt=src_ll_opt0) + if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt).decode()): + raise Exception("DATA_TEST_FRAME failed") + wait_fail_trigger(dev[0], "GET_ALLOC_FAIL") + def test_ap_hs20_connect_deinit(dev, apdev): """Hotspot 2.0 connection interrupted with deinit""" check_eap_capa(dev[0], "MSCHAPV2") @@ -4155,3 +5459,905 @@ def test_ap_hs20_interworking_oom(dev, apdev): if ev is None: raise Exception("ANQP did not start") wait_fail_trigger(dev[0], "GET_ALLOC_FAIL") + +def test_ap_hs20_no_cred_connect(dev, apdev): + """Hotspot 2.0 and connect attempt without credential""" + bssid = apdev[0]['bssid'] + params = hs20_ap_params() + params['hessid'] = bssid + hapd = hostapd.add_ap(apdev[0], params) + + dev[0].hs20_enable() + dev[0].scan_for_bss(bssid, freq="2412") + if "FAIL" not in dev[0].request("INTERWORKING_CONNECT " + bssid): + raise Exception("Unexpected INTERWORKING_CONNECT success") + +def test_ap_hs20_no_rsn_connect(dev, apdev): + """Hotspot 2.0 and connect attempt without RSN""" + bssid = apdev[0]['bssid'] + params = hostapd.wpa_params(ssid="test-hs20") + params['wpa_key_mgmt'] = "WPA-EAP" + params['ieee80211w'] = "1" + params['ieee8021x'] = "1" + params['auth_server_addr'] = "127.0.0.1" + params['auth_server_port'] = "1812" + params['auth_server_shared_secret'] = "radius" + params['interworking'] = "1" + params['roaming_consortium'] = [ "112233", "1020304050", "010203040506", + "fedcba" ] + params['nai_realm'] = [ "0,example.com,13[5:6],21[2:4][5:7]", + "0,another.example.com" ] + hapd = hostapd.add_ap(apdev[0], params) + + dev[0].hs20_enable() + dev[0].scan_for_bss(bssid, freq="2412") + + id = dev[0].add_cred_values({ 'realm': "example.com", + 'username': "test", + 'password': "secret", + 'domain': "example.com", + 'roaming_consortium': "112233", + 'eap': 'TTLS' }) + + interworking_select(dev[0], bssid, freq=2412, no_match=True) + if "FAIL" not in dev[0].request("INTERWORKING_CONNECT " + bssid): + raise Exception("Unexpected INTERWORKING_CONNECT success") + +def test_ap_hs20_no_match_connect(dev, apdev): + """Hotspot 2.0 and connect attempt without matching cred""" + bssid = apdev[0]['bssid'] + params = hs20_ap_params() + hapd = hostapd.add_ap(apdev[0], params) + + dev[0].hs20_enable() + dev[0].scan_for_bss(bssid, freq="2412") + + id = dev[0].add_cred_values({ 'realm': "example.org", + 'username': "test", + 'password': "secret", + 'domain': "example.org", + 'roaming_consortium': "112234", + 'eap': 'TTLS' }) + + interworking_select(dev[0], bssid, freq=2412, no_match=True) + if "FAIL" not in dev[0].request("INTERWORKING_CONNECT " + bssid): + raise Exception("Unexpected INTERWORKING_CONNECT success") + +def test_ap_hs20_multiple_home_cred(dev, apdev): + """Hotspot 2.0 and select with multiple matching home credentials""" + bssid = apdev[0]['bssid'] + params = hs20_ap_params() + params['hessid'] = bssid + params['nai_realm'] = [ "0,example.com,13[5:6],21[2:4][5:7]" ] + params['domain_name'] = "example.com" + hapd = hostapd.add_ap(apdev[0], params) + + bssid2 = apdev[1]['bssid'] + params = hs20_ap_params(ssid="test-hs20-other") + params['hessid'] = bssid2 + params['nai_realm'] = [ "0,example.org,13[5:6],21[2:4][5:7]" ] + params['domain_name'] = "example.org" + hapd2 = hostapd.add_ap(apdev[1], params) + + dev[0].hs20_enable() + dev[0].scan_for_bss(bssid2, freq="2412") + dev[0].scan_for_bss(bssid, freq="2412") + id = dev[0].add_cred_values({ 'realm': "example.com", + 'priority': '2', + 'username': "hs20-test", + 'password': "password", + 'domain': "example.com" }) + id2 = dev[0].add_cred_values({ 'realm': "example.org", + 'priority': '3', + 'username': "hs20-test", + 'password': "password", + 'domain': "example.org" }) + dev[0].request("INTERWORKING_SELECT auto freq=2412") + ev = dev[0].wait_connected(timeout=15) + if bssid2 not in ev: + raise Exception("Connected to incorrect network") + +def test_ap_hs20_anqp_invalid_gas_response(dev, apdev): + """Hotspot 2.0 network selection and invalid GAS response""" + bssid = apdev[0]['bssid'] + params = hs20_ap_params() + params['hessid'] = bssid + hapd = hostapd.add_ap(apdev[0], params) + + dev[0].scan_for_bss(bssid, freq="2412") + hapd.set("ext_mgmt_frame_handling", "1") + + dev[0].hs20_enable() + + id = dev[0].add_cred_values({ 'realm': "example.com", + 'username': "test", + 'password': "secret", + 'domain': "example.com", + 'roaming_consortium': "112234", + 'eap': 'TTLS' }) + dev[0].request("INTERWORKING_SELECT freq=2412") + + query = gas_rx(hapd) + gas = parse_gas(query['payload']) + + logger.info("ANQP: Unexpected Advertisement Protocol in response") + resp = action_response(query) + adv_proto = struct.pack('8B', 108, 6, 127, 0xdd, 0x00, 0x11, 0x22, 0x33) + data = struct.pack('eap.eap_methods = os_malloc()") + with alloc_fail(dev[0], 1, "interworking_set_eap_params"): + dev[0].request("INTERWORKING_CONNECT " + bssid) + wait_fail_trigger(dev[0], "GET_ALLOC_FAIL") + dev[0].remove_cred(id) + + id = dev[0].add_cred_values({ 'realm': "example.com", + 'domain': "example.com", + 'username': "hs20-test-with-domain@example.com", + 'password': "password" }) + interworking_select(dev[0], bssid, "home", freq=2412) + dev[0].dump_monitor() + dev[0].request("NOTE anon = os_malloc()") + with alloc_fail(dev[0], 1, "interworking_set_eap_params"): + dev[0].request("INTERWORKING_CONNECT " + bssid) + wait_fail_trigger(dev[0], "GET_ALLOC_FAIL") + dev[0].request("NOTE Successful connection with cred->username including realm") + dev[0].request("INTERWORKING_CONNECT " + bssid) + dev[0].wait_connected() + dev[0].remove_cred(id) + dev[0].wait_disconnected() + + id = dev[0].add_cred_values({ 'realm': "example.com", + 'domain': "example.com", + 'username': "hs20-test", + 'password': "password" }) + interworking_select(dev[0], bssid, "home", freq=2412) + dev[0].dump_monitor() + dev[0].request("NOTE anon = os_malloc() (second)") + with alloc_fail(dev[0], 1, "interworking_set_eap_params"): + dev[0].request("INTERWORKING_CONNECT " + bssid) + wait_fail_trigger(dev[0], "GET_ALLOC_FAIL") + with alloc_fail(dev[0], 1, "wpa_config_add_network;interworking_connect"): + dev[0].request("INTERWORKING_CONNECT " + bssid) + wait_fail_trigger(dev[0], "GET_ALLOC_FAIL") + with alloc_fail(dev[0], 1, "=interworking_connect"): + dev[0].request("INTERWORKING_CONNECT " + bssid) + wait_fail_trigger(dev[0], "GET_ALLOC_FAIL") + dev[0].request("NOTE wpa_config_set(eap)") + with alloc_fail(dev[0], 1, "wpa_config_parse_eap;wpa_config_set;interworking_connect"): + dev[0].request("INTERWORKING_CONNECT " + bssid) + wait_fail_trigger(dev[0], "GET_ALLOC_FAIL") + dev[0].request("NOTE wpa_config_set(TTLS-NON_EAP_MSCHAPV2-phase2)") + with alloc_fail(dev[0], 1, "wpa_config_parse_str;wpa_config_set;interworking_connect"): + dev[0].request("INTERWORKING_CONNECT " + bssid) + wait_fail_trigger(dev[0], "GET_ALLOC_FAIL") + dev[0].remove_cred(id) + + id = dev[0].add_cred_values({ 'roaming_consortium': "112233", + 'domain': "example.com", + 'username': "hs20-test", + 'password': "password", + 'eap': 'TTLS', + 'phase2': "auth=MSCHAPV2" }) + interworking_select(dev[0], bssid, "home", freq=2412) + dev[0].dump_monitor() + dev[0].request("NOTE anon = os_strdup()") + with alloc_fail(dev[0], 2, "interworking_set_eap_params"): + dev[0].request("INTERWORKING_CONNECT " + bssid) + wait_fail_trigger(dev[0], "GET_ALLOC_FAIL") + dev[0].request("NOTE wpa_config_set_quoted(anonymous_identity)") + with alloc_fail(dev[0], 1, "=wpa_config_set_quoted;interworking_set_eap_params"): + dev[0].request("INTERWORKING_CONNECT " + bssid) + wait_fail_trigger(dev[0], "GET_ALLOC_FAIL") + dev[0].request("NOTE Successful connection with cred->realm not included") + dev[0].request("INTERWORKING_CONNECT " + bssid) + dev[0].wait_connected() + dev[0].remove_cred(id) + dev[0].wait_disconnected() + + id = dev[0].add_cred_values({ 'roaming_consortium': "112233", + 'domain': "example.com", + 'realm': "example.com", + 'username': "user", + 'password': "password", + 'eap': 'PEAP' }) + interworking_select(dev[0], bssid, "home", freq=2412) + dev[0].dump_monitor() + dev[0].request("NOTE id = os_strdup()") + with alloc_fail(dev[0], 2, "interworking_set_eap_params"): + dev[0].request("INTERWORKING_CONNECT " + bssid) + wait_fail_trigger(dev[0], "GET_ALLOC_FAIL") + dev[0].request("NOTE wpa_config_set_quoted(identity)") + with alloc_fail(dev[0], 1, "=wpa_config_set_quoted;interworking_set_eap_params"): + dev[0].request("INTERWORKING_CONNECT " + bssid) + wait_fail_trigger(dev[0], "GET_ALLOC_FAIL") + dev[0].remove_cred(id) + + id = dev[0].add_cred_values({ 'roaming_consortium': "112233", + 'domain': "example.com", + 'realm': "example.com", + 'username': "user", + 'password': "password", + 'eap': "TTLS" }) + interworking_select(dev[0], bssid, "home", freq=2412) + dev[0].dump_monitor() + dev[0].request("NOTE wpa_config_set_quoted(identity) (second)") + with alloc_fail(dev[0], 2, "=wpa_config_set_quoted;interworking_set_eap_params"): + dev[0].request("INTERWORKING_CONNECT " + bssid) + wait_fail_trigger(dev[0], "GET_ALLOC_FAIL") + dev[0].request("NOTE wpa_config_set_quoted(password)") + with alloc_fail(dev[0], 3, "=wpa_config_set_quoted;interworking_set_eap_params"): + dev[0].request("INTERWORKING_CONNECT " + bssid) + wait_fail_trigger(dev[0], "GET_ALLOC_FAIL") + with alloc_fail(dev[0], 1, "wpa_config_add_network;interworking_connect_roaming_consortium"): + dev[0].request("INTERWORKING_CONNECT " + bssid) + wait_fail_trigger(dev[0], "GET_ALLOC_FAIL") + with alloc_fail(dev[0], 1, "=interworking_connect_roaming_consortium"): + dev[0].request("INTERWORKING_CONNECT " + bssid) + wait_fail_trigger(dev[0], "GET_ALLOC_FAIL") + dev[0].remove_cred(id) + + id = dev[0].add_cred_values({ 'roaming_consortium': "112233", + 'domain': "example.com", + 'realm': "example.com", + 'username': "user", + 'eap': "PEAP" }) + dev[0].set_cred(id, "password", "ext:password") + interworking_select(dev[0], bssid, "home", freq=2412) + dev[0].dump_monitor() + dev[0].request("NOTE wpa_config_set(password)") + with alloc_fail(dev[0], 3, "wpa_config_set;interworking_set_eap_params"): + dev[0].request("INTERWORKING_CONNECT " + bssid) + wait_fail_trigger(dev[0], "GET_ALLOC_FAIL") + with alloc_fail(dev[0], 1, "interworking_set_hs20_params"): + dev[0].request("INTERWORKING_CONNECT " + bssid) + wait_fail_trigger(dev[0], "GET_ALLOC_FAIL") + dev[0].remove_cred(id) + + id = dev[0].add_cred_values({ 'realm': "example.com", + 'domain': "example.com", + 'username': "certificate-user", + 'phase1': "include_tls_length=0", + 'domain_suffix_match': "example.com", + 'ca_cert': "auth_serv/ca.pem", + 'client_cert': "auth_serv/user.pem", + 'private_key': "auth_serv/user.key", + 'private_key_passwd': "secret" }) + interworking_select(dev[0], bssid, "home", freq=2412) + dev[0].dump_monitor() + dev[0].request("NOTE wpa_config_set_quoted(client_cert)") + with alloc_fail(dev[0], 2, "=wpa_config_set_quoted;interworking_set_eap_params"): + dev[0].request("INTERWORKING_CONNECT " + bssid) + wait_fail_trigger(dev[0], "GET_ALLOC_FAIL") + dev[0].request("NOTE wpa_config_set_quoted(private_key)") + with alloc_fail(dev[0], 3, "=wpa_config_set_quoted;interworking_set_eap_params"): + dev[0].request("INTERWORKING_CONNECT " + bssid) + wait_fail_trigger(dev[0], "GET_ALLOC_FAIL") + dev[0].request("NOTE wpa_config_set_quoted(private_key_passwd)") + with alloc_fail(dev[0], 4, "=wpa_config_set_quoted;interworking_set_eap_params"): + dev[0].request("INTERWORKING_CONNECT " + bssid) + wait_fail_trigger(dev[0], "GET_ALLOC_FAIL") + dev[0].request("NOTE wpa_config_set_quoted(ca_cert)") + with alloc_fail(dev[0], 5, "=wpa_config_set_quoted;interworking_set_eap_params"): + dev[0].request("INTERWORKING_CONNECT " + bssid) + wait_fail_trigger(dev[0], "GET_ALLOC_FAIL") + dev[0].request("NOTE wpa_config_set_quoted(domain_suffix_match)") + with alloc_fail(dev[0], 6, "=wpa_config_set_quoted;interworking_set_eap_params"): + dev[0].request("INTERWORKING_CONNECT " + bssid) + wait_fail_trigger(dev[0], "GET_ALLOC_FAIL") + with alloc_fail(dev[0], 1, "interworking_set_hs20_params"): + dev[0].request("INTERWORKING_CONNECT " + bssid) + wait_fail_trigger(dev[0], "GET_ALLOC_FAIL") + dev[0].remove_cred(id) + + id = dev[0].add_cred_values({ 'imsi': "555444-333222111", 'eap': "SIM", + 'milenage': "5122250214c33e723a5dd523fc145fc0:981d464c7c52eb6e5036234984ad0bcf:000000000123"}) + interworking_select(dev[0], bssid, freq=2412) + dev[0].dump_monitor() + with alloc_fail(dev[0], 1, "interworking_set_hs20_params"): + dev[0].request("INTERWORKING_CONNECT " + bssid) + wait_fail_trigger(dev[0], "GET_ALLOC_FAIL") + dev[0].request("NOTE wpa_config_set_quoted(password;milenage)") + with alloc_fail(dev[0], 2, "=wpa_config_set_quoted;interworking_connect_3gpp"): + dev[0].request("INTERWORKING_CONNECT " + bssid) + wait_fail_trigger(dev[0], "GET_ALLOC_FAIL") + dev[0].request("NOTE wpa_config_set(eap)") + with alloc_fail(dev[0], 1, "wpa_config_parse_eap;wpa_config_set;interworking_connect_3gpp"): + dev[0].request("INTERWORKING_CONNECT " + bssid) + wait_fail_trigger(dev[0], "GET_ALLOC_FAIL") + dev[0].request("NOTE set_root_nai:wpa_config_set(identity)") + with alloc_fail(dev[0], 1, "wpa_config_parse_str;interworking_connect_3gpp"): + dev[0].request("INTERWORKING_CONNECT " + bssid) + wait_fail_trigger(dev[0], "GET_ALLOC_FAIL") + dev[0].remove_cred(id) + + id = dev[0].add_cred_values({ 'roaming_consortium': "112233", + 'username': "user@example.com", + 'password': "password" }) + interworking_select(dev[0], bssid, freq=2412) + dev[0].dump_monitor() + dev[0].request("NOTE Interworking: No EAP method set for credential using roaming consortium") + dev[0].request("INTERWORKING_CONNECT " + bssid) + dev[0].remove_cred(id) + + hapd.disable() + params = hs20_ap_params() + params['nai_realm'] = "0,example.com,25[3:26]" + hapd = hostapd.add_ap(apdev[0], params) + id = dev[0].add_cred_values({ 'realm': "example.com", + 'domain': "example.com", + 'username': "hs20-test", + 'password': "password" }) + interworking_select(dev[0], bssid, freq=2412) + dev[0].dump_monitor() + dev[0].request("NOTE wpa_config_set(PEAP/FAST-phase1)") + with alloc_fail(dev[0], 1, "wpa_config_parse_str;wpa_config_set;interworking_connect"): + dev[0].request("INTERWORKING_CONNECT " + bssid) + wait_fail_trigger(dev[0], "GET_ALLOC_FAIL") + dev[0].request("NOTE wpa_config_set(PEAP/FAST-pac_interworking)") + with alloc_fail(dev[0], 2, "wpa_config_parse_str;wpa_config_set;interworking_connect"): + dev[0].request("INTERWORKING_CONNECT " + bssid) + wait_fail_trigger(dev[0], "GET_ALLOC_FAIL") + dev[0].request("NOTE wpa_config_set(PEAP/FAST-phase2)") + with alloc_fail(dev[0], 3, "wpa_config_parse_str;wpa_config_set;interworking_connect"): + dev[0].request("INTERWORKING_CONNECT " + bssid) + wait_fail_trigger(dev[0], "GET_ALLOC_FAIL") + + hapd.disable() + params = hs20_ap_params() + params['nai_realm'] = "0,example.com,21" + hapd = hostapd.add_ap(apdev[0], params) + interworking_select(dev[0], bssid, freq=2412) + dev[0].request("NOTE wpa_config_set(TTLS-defaults-phase2)") + with alloc_fail(dev[0], 1, "wpa_config_parse_str;wpa_config_set;interworking_connect"): + dev[0].request("INTERWORKING_CONNECT " + bssid) + wait_fail_trigger(dev[0], "GET_ALLOC_FAIL") + + hapd.disable() + params = hs20_ap_params() + params['nai_realm'] = "0,example.com,21[2:3]" + hapd = hostapd.add_ap(apdev[0], params) + interworking_select(dev[0], bssid, freq=2412) + dev[0].request("NOTE wpa_config_set(TTLS-NON_EAP_MSCHAP-phase2)") + with alloc_fail(dev[0], 1, "wpa_config_parse_str;wpa_config_set;interworking_connect"): + dev[0].request("INTERWORKING_CONNECT " + bssid) + wait_fail_trigger(dev[0], "GET_ALLOC_FAIL") + + hapd.disable() + params = hs20_ap_params() + params['nai_realm'] = "0,example.com,21[2:2]" + hapd = hostapd.add_ap(apdev[0], params) + interworking_select(dev[0], bssid, freq=2412) + dev[0].request("NOTE wpa_config_set(TTLS-NON_EAP_CHAP-phase2)") + with alloc_fail(dev[0], 1, "wpa_config_parse_str;wpa_config_set;interworking_connect"): + dev[0].request("INTERWORKING_CONNECT " + bssid) + wait_fail_trigger(dev[0], "GET_ALLOC_FAIL") + + hapd.disable() + params = hs20_ap_params() + params['nai_realm'] = "0,example.com,21[2:1]" + hapd = hostapd.add_ap(apdev[0], params) + interworking_select(dev[0], bssid, freq=2412) + dev[0].request("NOTE wpa_config_set(TTLS-NON_EAP_PAP-phase2)") + with alloc_fail(dev[0], 1, "wpa_config_parse_str;wpa_config_set;interworking_connect"): + dev[0].request("INTERWORKING_CONNECT " + bssid) + wait_fail_trigger(dev[0], "GET_ALLOC_FAIL") + + hapd.disable() + params = hs20_ap_params() + params['nai_realm'] = "0,example.com,21[3:26]" + hapd = hostapd.add_ap(apdev[0], params) + interworking_select(dev[0], bssid, freq=2412) + dev[0].request("NOTE wpa_config_set(TTLS-EAP-MSCHAPV2-phase2)") + with alloc_fail(dev[0], 1, "wpa_config_parse_str;wpa_config_set;interworking_connect"): + dev[0].request("INTERWORKING_CONNECT " + bssid) + wait_fail_trigger(dev[0], "GET_ALLOC_FAIL") + + dev[0].remove_cred(id) + +def test_ap_hs20_unexpected(dev, apdev): + """Unexpected Hotspot 2.0 AP configuration""" + check_eap_capa(dev[0], "MSCHAPV2") + bssid = apdev[0]['bssid'] + params = hostapd.wpa_eap_params(ssid="test-hs20-fake") + params['wpa'] = "3" + params['wpa_pairwise'] = "TKIP CCMP" + params['rsn_pairwise'] = "CCMP" + params['ieee80211w'] = "1" + #params['vendor_elements'] = 'dd07506f9a10140000' + params['vendor_elements'] = 'dd04506f9a10' + hostapd.add_ap(apdev[0], params) + + dev[0].hs20_enable() + dev[0].scan_for_bss(bssid, freq="2412") + dev[0].connect("test-hs20-fake", key_mgmt="WPA-EAP", eap="TTLS", + pairwise="TKIP", + identity="hs20-test", password="password", + ca_cert="auth_serv/ca.pem", phase2="auth=MSCHAPV2", + scan_freq="2412") + + dev[1].hs20_enable() + dev[1].scan_for_bss(bssid, freq="2412") + dev[1].connect("test-hs20-fake", key_mgmt="WPA-EAP", eap="TTLS", + proto="WPA", + identity="hs20-test", password="password", + ca_cert="auth_serv/ca.pem", phase2="auth=MSCHAPV2", + scan_freq="2412") + + dev[2].hs20_enable() + dev[2].scan_for_bss(bssid, freq="2412") + dev[2].connect("test-hs20-fake", key_mgmt="WPA-EAP", eap="TTLS", + ieee80211w="1", + proto="RSN", pairwise="CCMP", + identity="hs20-test", password="password", + ca_cert="auth_serv/ca.pem", phase2="auth=MSCHAPV2", + scan_freq="2412") + +def test_ap_interworking_element_update(dev, apdev): + """Dynamic Interworking element update""" + bssid = apdev[0]['bssid'] + params = hs20_ap_params() + params['hessid'] = bssid + hapd = hostapd.add_ap(apdev[0], params) + + dev[0].hs20_enable() + dev[0].scan_for_bss(bssid, freq="2412") + bss = dev[0].get_bss(bssid) + logger.info("Before update: " + str(bss)) + if '6b091e0701020000000300' not in bss['ie']: + raise Exception("Expected Interworking element not seen before update") + + # Update configuration parameters related to Interworking element + hapd.set('access_network_type', '2') + hapd.set('asra', '1') + hapd.set('esr', '1') + hapd.set('uesa', '1') + hapd.set('venue_group', '2') + hapd.set('venue_type', '8') + if "OK" not in hapd.request("UPDATE_BEACON"): + raise Exception("UPDATE_BEACON failed") + dev[0].request("BSS_FLUSH 0") + dev[0].scan_for_bss(bssid, freq="2412", force_scan=True) + bss = dev[0].get_bss(bssid) + logger.info("After update: " + str(bss)) + if '6b09f20208020000000300' not in bss['ie']: + raise Exception("Expected Interworking element not seen after update") + +def test_ap_hs20_terms_and_conditions(dev, apdev): + """Hotspot 2.0 Terms and Conditions signaling""" + check_eap_capa(dev[0], "MSCHAPV2") + bssid = apdev[0]['bssid'] + params = hs20_ap_params() + params['hessid'] = bssid + params['hs20_t_c_filename'] = 'terms-and-conditions' + params['hs20_t_c_timestamp'] = '123456789' + + hostapd.add_ap(apdev[0], params) + + dev[0].hs20_enable() + dev[0].connect("test-hs20", proto="RSN", key_mgmt="WPA-EAP", eap="TTLS", + identity="hs20-t-c-test", password="password", + ca_cert="auth_serv/ca.pem", phase2="auth=MSCHAPV2", + ieee80211w='2', scan_freq="2412") + ev = dev[0].wait_event(["HS20-T-C-ACCEPTANCE"], timeout=5) + if ev is None: + raise Exception("Terms and Conditions Acceptance notification not received") + url = "https://example.com/t_and_c?addr=%s&ap=123" % dev[0].own_addr() + if url not in ev: + raise Exception("Unexpected URL: " + ev) + +def test_ap_hs20_terms_and_conditions_coa(dev, apdev): + """Hotspot 2.0 Terms and Conditions signaling - CoA""" + try: + import pyrad.client + import pyrad.packet + import pyrad.dictionary + import radius_das + except ImportError: + raise HwsimSkip("No pyrad modules available") + + check_eap_capa(dev[0], "MSCHAPV2") + bssid = apdev[0]['bssid'] + params = hs20_ap_params() + params['hessid'] = bssid + params['hs20_t_c_filename'] = 'terms-and-conditions' + params['hs20_t_c_timestamp'] = '123456789' + params['own_ip_addr'] = "127.0.0.1" + params['radius_das_port'] = "3799" + params['radius_das_client'] = "127.0.0.1 secret" + params['radius_das_require_event_timestamp'] = "1" + hapd = hostapd.add_ap(apdev[0], params) + + dev[0].hs20_enable() + dev[0].connect("test-hs20", proto="RSN", key_mgmt="WPA-EAP", eap="TTLS", + identity="hs20-t-c-test", password="password", + ca_cert="auth_serv/ca.pem", phase2="auth=MSCHAPV2", + ieee80211w='2', scan_freq="2412") + + ev = hapd.wait_event(["HS20-T-C-FILTERING-ADD"], timeout=5) + if ev is None: + raise Exception("Terms and Conditions filtering not enabled") + if ev.split(' ')[1] != dev[0].own_addr(): + raise Exception("Unexpected STA address for filtering: " + ev) + + ev = dev[0].wait_event(["HS20-T-C-ACCEPTANCE"], timeout=5) + if ev is None: + raise Exception("Terms and Conditions Acceptance notification not received") + url = "https://example.com/t_and_c?addr=%s&ap=123" % dev[0].own_addr() + if url not in ev: + raise Exception("Unexpected URL: " + ev) + + dict = pyrad.dictionary.Dictionary("dictionary.radius") + + srv = pyrad.client.Client(server="127.0.0.1", acctport=3799, + secret="secret", dict=dict) + srv.retries = 1 + srv.timeout = 1 + + sta = hapd.get_sta(dev[0].own_addr()) + multi_sess_id = sta['authMultiSessionId'] + + logger.info("CoA-Request with matching Acct-Session-Id") + vsa = binascii.unhexlify('00009f68090600000000') + req = radius_das.CoAPacket(dict=dict, secret="secret", + NAS_IP_Address="127.0.0.1", + Acct_Multi_Session_Id=multi_sess_id, + Chargeable_User_Identity="hs20-cui", + Event_Timestamp=int(time.time()), + Vendor_Specific=vsa) + reply = srv.SendPacket(req) + logger.debug("RADIUS response from hostapd") + for i in list(reply.keys()): + logger.debug("%s: %s" % (i, reply[i])) + if reply.code != pyrad.packet.CoAACK: + raise Exception("CoA-Request failed") + + ev = hapd.wait_event(["HS20-T-C-FILTERING-REMOVE"], timeout=5) + if ev is None: + raise Exception("Terms and Conditions filtering not disabled") + if ev.split(' ')[1] != dev[0].own_addr(): + raise Exception("Unexpected STA address for filtering: " + ev) + +def test_ap_hs20_terms_and_conditions_sql(dev, apdev, params): + """Hotspot 2.0 Terms and Conditions using SQLite for user DB""" + addr = dev[0].own_addr() + run_ap_hs20_terms_and_conditions_sql(dev, apdev, params, + "https://example.com/t_and_c?addr=@1@&ap=123", + "https://example.com/t_and_c?addr=" + addr + "&ap=123") + +def test_ap_hs20_terms_and_conditions_sql2(dev, apdev, params): + """Hotspot 2.0 Terms and Conditions using SQLite for user DB""" + addr = dev[0].own_addr() + run_ap_hs20_terms_and_conditions_sql(dev, apdev, params, + "https://example.com/t_and_c?addr=@1@", + "https://example.com/t_and_c?addr=" + addr) + +def run_ap_hs20_terms_and_conditions_sql(dev, apdev, params, url_template, + url_expected): + check_eap_capa(dev[0], "MSCHAPV2") + try: + import sqlite3 + except ImportError: + raise HwsimSkip("No sqlite3 module available") + dbfile = os.path.join(params['logdir'], "eap-user.db") + try: + os.remove(dbfile) + except: + pass + con = sqlite3.connect(dbfile) + with con: + cur = con.cursor() + cur.execute("CREATE TABLE users(identity TEXT PRIMARY KEY, methods TEXT, password TEXT, remediation TEXT, phase2 INTEGER, t_c_timestamp INTEGER)") + cur.execute("CREATE TABLE wildcards(identity TEXT PRIMARY KEY, methods TEXT)") + cur.execute("INSERT INTO users(identity,methods,password,phase2) VALUES ('user-mschapv2','TTLS-MSCHAPV2','password',1)") + cur.execute("INSERT INTO wildcards(identity,methods) VALUES ('','TTLS,TLS')") + cur.execute("CREATE TABLE authlog(timestamp TEXT, session TEXT, nas_ip TEXT, username TEXT, note TEXT)") + cur.execute("CREATE TABLE pending_tc(mac_addr TEXT PRIMARY KEY, identity TEXT)") + cur.execute("CREATE TABLE current_sessions(mac_addr TEXT PRIMARY KEY, identity TEXT, start_time TEXT, nas TEXT, hs20_t_c_filtering BOOLEAN, waiting_coa_ack BOOLEAN, coa_ack_received BOOLEAN)") + + + try: + params = { "ssid": "as", "beacon_int": "2000", + "radius_server_clients": "auth_serv/radius_clients.conf", + "radius_server_auth_port": '18128', + "eap_server": "1", + "eap_user_file": "sqlite:" + dbfile, + "ca_cert": "auth_serv/ca.pem", + "server_cert": "auth_serv/server.pem", + "private_key": "auth_serv/server.key" } + params['hs20_t_c_server_url'] = url_template + authsrv = hostapd.add_ap(apdev[1], params) + + bssid = apdev[0]['bssid'] + params = hs20_ap_params() + params['auth_server_port'] = "18128" + params['hs20_t_c_filename'] = 'terms-and-conditions' + params['hs20_t_c_timestamp'] = '123456789' + params['own_ip_addr'] = "127.0.0.1" + params['radius_das_port'] = "3799" + params['radius_das_client'] = "127.0.0.1 radius" + params['radius_das_require_event_timestamp'] = "1" + params['disable_pmksa_caching'] = '1' + hapd = hostapd.add_ap(apdev[0], params) + + dev[0].request("SET pmf 1") + dev[0].hs20_enable() + id = dev[0].add_cred_values({ 'realm': "example.com", + 'username': "user-mschapv2", + 'password': "password", + 'ca_cert': "auth_serv/ca.pem" }) + interworking_select(dev[0], bssid, freq="2412") + interworking_connect(dev[0], bssid, "TTLS") + + ev = hapd.wait_event(["HS20-T-C-FILTERING-ADD"], timeout=5) + if ev is None: + raise Exception("Terms and Conditions filtering not enabled") + hapd.dump_monitor() + + ev = dev[0].wait_event(["HS20-T-C-ACCEPTANCE"], timeout=5) + if ev is None: + raise Exception("Terms and Conditions Acceptance notification not received") + url = ev.split(' ')[1] + if url != url_expected: + raise Exception("Unexpected URL delivered to the client: %s (expected %s)" % (url, url_expected)) + dev[0].dump_monitor() + + with con: + cur = con.cursor() + cur.execute("SELECT * from current_sessions") + rows = cur.fetchall() + if len(rows) != 1: + raise Exeception("Unexpected number of rows in current_sessions (%d; expected %d)" % (len(rows), 1)) + logger.info("current_sessions: " + str(rows)) + + if "OK" not in authsrv.request("DAC_REQUEST coa %s t_c_clear" % dev[0].own_addr()): + raise Exception("DAC_REQUEST failed") + + ev = hapd.wait_event(["HS20-T-C-FILTERING-REMOVE"], timeout=5) + if ev is None: + raise Exception("Terms and Conditions filtering not disabled") + if ev.split(' ')[1] != dev[0].own_addr(): + raise Exception("Unexpected STA address for filtering: " + ev) + + time.sleep(0.2) + with con: + cur = con.cursor() + cur.execute("SELECT * from current_sessions") + rows = cur.fetchall() + if len(rows) != 1: + raise Exeception("Unexpected number of rows in current_sessions (%d; expected %d)" % (len(rows), 1)) + logger.info("current_sessions: " + str(rows)) + if rows[0][4] != 0 or rows[0][5] != 0 or rows[0][6] != 1: + raise Exception("Unexpected current_sessions information after CoA-ACK") + + dev[0].request("DISCONNECT") + dev[0].wait_disconnected() + dev[0].dump_monitor() + + # Simulate T&C server operation on user reading the updated version + with con: + cur = con.cursor() + cur.execute("SELECT identity FROM pending_tc WHERE mac_addr='" + + dev[0].own_addr() + "'") + rows = cur.fetchall() + if len(rows) != 1: + raise Exception("No pending_tc entry found") + if rows[0][0] != 'user-mschapv2': + raise Exception("Unexpected pending_tc identity value") + + cur.execute("UPDATE users SET t_c_timestamp=123456789 WHERE identity='user-mschapv2'") + + dev[0].request("RECONNECT") + dev[0].wait_connected() + + ev = hapd.wait_event(["HS20-T-C-FILTERING-ADD"], timeout=0.1) + if ev is not None: + raise Exception("Terms and Conditions filtering enabled unexpectedly") + hapd.dump_monitor() + + ev = dev[0].wait_event(["HS20-T-C-ACCEPTANCE"], timeout=0.1) + if ev is not None: + raise Exception("Unexpected Terms and Conditions Acceptance notification") + dev[0].dump_monitor() + + dev[0].request("DISCONNECT") + dev[0].wait_disconnected() + dev[0].dump_monitor() + + # New T&C available + hapd.set('hs20_t_c_timestamp', '123456790') + + dev[0].request("RECONNECT") + dev[0].wait_connected() + + ev = hapd.wait_event(["HS20-T-C-FILTERING-ADD"], timeout=5) + if ev is None: + raise Exception("Terms and Conditions filtering not enabled") + hapd.dump_monitor() + + ev = dev[0].wait_event(["HS20-T-C-ACCEPTANCE"], timeout=5) + if ev is None: + raise Exception("Terms and Conditions Acceptance notification not received (2)") + dev[0].dump_monitor() + + dev[0].request("DISCONNECT") + dev[0].wait_disconnected() + dev[0].dump_monitor() + + # Simulate T&C server operation on user reading the updated version + with con: + cur = con.cursor() + cur.execute("UPDATE users SET t_c_timestamp=123456790 WHERE identity='user-mschapv2'") + + dev[0].request("RECONNECT") + dev[0].wait_connected() + + ev = hapd.wait_event(["HS20-T-C-FILTERING-ADD"], timeout=0.1) + if ev is not None: + raise Exception("Terms and Conditions filtering enabled unexpectedly") + hapd.dump_monitor() + + ev = dev[0].wait_event(["HS20-T-C-ACCEPTANCE"], timeout=0.1) + if ev is not None: + raise Exception("Unexpected Terms and Conditions Acceptance notification (2)") + dev[0].dump_monitor() + finally: + os.remove(dbfile) + dev[0].request("SET pmf 0") + +def test_ap_hs20_release_number_1(dev, apdev): + """Hotspot 2.0 with AP claiming support for Release 1""" + run_ap_hs20_release_number(dev, apdev, 1) + +def test_ap_hs20_release_number_2(dev, apdev): + """Hotspot 2.0 with AP claiming support for Release 2""" + run_ap_hs20_release_number(dev, apdev, 2) + +def test_ap_hs20_release_number_3(dev, apdev): + """Hotspot 2.0 with AP claiming support for Release 3""" + run_ap_hs20_release_number(dev, apdev, 3) + +def run_ap_hs20_release_number(dev, apdev, release): + check_eap_capa(dev[0], "MSCHAPV2") + eap_test(dev[0], apdev[0], "21[3:26][6:7][99:99]", "TTLS", "user", + release=release) + rel = dev[0].get_status_field('hs20') + if rel != str(release): + raise Exception("Unexpected release number indicated: " + rel) + +def test_ap_hs20_missing_pmf(dev, apdev): + """Hotspot 2.0 connection attempt without PMF""" + check_eap_capa(dev[0], "MSCHAPV2") + bssid = apdev[0]['bssid'] + params = hs20_ap_params() + params['hessid'] = bssid + params['disable_dgaf'] = '1' + hostapd.add_ap(apdev[0], params) + + dev[0].hs20_enable() + dev[0].connect("test-hs20", proto="RSN", key_mgmt="WPA-EAP", eap="TTLS", + ieee80211w="0", + identity="hs20-test", password="password", + ca_cert="auth_serv/ca.pem", phase2="auth=MSCHAPV2", + scan_freq="2412", update_identifier="54321", + roaming_consortium_selection="1020304050", + wait_connect=False) + ev = dev[0].wait_event(["CTRL-EVENT-ASSOC-REJECT"], timeout=10) + dev[0].request("DISCONNECT") + if ev is None: + raise Exception("Association rejection not reported") + if "status_code=31" not in ev: + raise Exception("Unexpected rejection reason: " + ev)