break
return None
- def wait_sta(self, addr=None, timeout=2):
+ def wait_sta(self, addr=None, timeout=2, wait_4way_hs=False):
ev = self.wait_event(["AP-STA-CONNECT"], timeout=timeout)
if ev is None:
raise Exception("AP did not report STA connection")
if addr and addr not in ev:
raise Exception("Unexpected STA address in connection event: " + ev)
+ if wait_4way_hs:
+ ev2 = self.wait_event(["EAPOL-4WAY-HS-COMPLETED"],
+ timeout=timeout)
+ if ev2 is None:
+ raise Exception("AP did not report 4-way handshake completion")
+ if addr and addr not in ev2:
+ raise Exception("Unexpected STA address in 4-way handshake completion event: " + ev2)
+ return ev
+
+ def wait_sta_disconnect(self, addr=None, timeout=2):
+ ev = self.wait_event(["AP-STA-DISCONNECT"], timeout=timeout)
+ if ev is None:
+ raise Exception("AP did not report STA disconnection")
+ if addr and addr not in ev:
+ raise Exception("Unexpected STA address in disconnection event: " + ev)
return ev
def wait_ptkinitdone(self, addr, timeout=2):
res = client.p2p_connect_group(go.p2p_dev_addr(), pin, timeout=60,
social=social, freq=freq)
logger.info("Client connected")
+ go.wait_sta(client.p2p_interface_addr())
hwsim_utils.test_connectivity_p2p(go, client)
return res
r_res = r_dev.p2p_go_neg_auth_result(expect_failure=expect_failure)
logger.debug("i_res: " + str(i_res))
logger.debug("r_res: " + str(r_res))
+ if not expect_failure and i_res and r_res and \
+ i_res['result'] == 'success' and r_res['result'] == 'success':
+ if i_res['role'] == 'GO':
+ i_dev.wait_sta(addr=r_dev.p2p_interface_addr())
+ if r_res['role'] == 'GO':
+ r_dev.wait_sta(addr=i_dev.p2p_interface_addr())
r_dev.dump_monitor()
i_dev.dump_monitor()
if i_go_neg_status:
logger.info("AKA fast re-authentication")
eap_reauth(dev[0], "AKA")
+ ev = hapd.wait_event(["EAPOL-4WAY-HS-COMPLETED"], timeout=1)
+ if ev is None:
+ raise Exception("hostapd did not report 4-way handshake completion")
logger.info("AKA full auth with pseudonym")
with con:
cur = con.cursor()
cur.execute("DELETE FROM reauth WHERE permanent='0232010000000000'")
eap_reauth(dev[0], "AKA")
+ ev = hapd.wait_event(["EAPOL-4WAY-HS-COMPLETED"], timeout=1)
+ if ev is None:
+ raise Exception("hostapd did not report 4-way handshake completion")
logger.info("AKA full auth with permanent identity")
with con:
cur.execute("DELETE FROM reauth WHERE permanent='0232010000000000'")
cur.execute("DELETE FROM pseudonyms WHERE permanent='0232010000000000'")
eap_reauth(dev[0], "AKA")
+ ev = hapd.wait_event(["EAPOL-4WAY-HS-COMPLETED"], timeout=1)
+ if ev is None:
+ raise Exception("hostapd did not report 4-way handshake completion")
logger.info("AKA reauth with mismatching MK")
with con:
cur = con.cursor()
cur.execute("UPDATE reauth SET counter='10' WHERE permanent='0232010000000000'")
eap_reauth(dev[0], "AKA")
+ ev = hapd.wait_event(["EAPOL-4WAY-HS-COMPLETED"], timeout=1)
+ if ev is None:
+ raise Exception("hostapd did not report 4-way handshake completion")
with con:
cur = con.cursor()
cur.execute("UPDATE reauth SET counter='10' WHERE permanent='0232010000000000'")
logger.info("AKA reauth with mismatching counter")
eap_reauth(dev[0], "AKA")
+ ev = hapd.wait_event(["EAPOL-4WAY-HS-COMPLETED"], timeout=1)
+ if ev is None:
+ raise Exception("hostapd did not report 4-way handshake completion")
dev[0].request("REMOVE_NETWORK all")
+ dev[0].wait_disconnected()
+ hapd.wait_sta_disconnect()
eap_connect(dev[0], hapd, "AKA", "0232010000000000",
password="90dca4eda45b53cf0f12d7c9c3bc6a89:cb9cccc4b9258e6dca4760379fb82581:000000000123")
cur.execute("UPDATE reauth SET counter='1001' WHERE permanent='0232010000000000'")
logger.info("AKA reauth with max reauth count reached")
eap_reauth(dev[0], "AKA")
+ ev = hapd.wait_event(["EAPOL-4WAY-HS-COMPLETED"], timeout=1)
+ if ev is None:
+ raise Exception("hostapd did not report 4-way handshake completion")
def test_ap_wpa2_eap_aka_sql_fallback_to_pseudonym_id(dev, apdev, params):
"""WPA2-Enterprise connection using EAP-AKA (SQL) and fallback to pseudonym using AKA-Identity"""
params['anqp_3gpp_cell_net'] = "244,91"
return params
-def check_auto_select(dev, bssid):
+def check_auto_select(dev, bssid, hapd=None):
dev.scan_for_bss(bssid, freq="2412")
dev.request("INTERWORKING_SELECT auto freq=2412")
ev = dev.wait_connected(timeout=15)
if bssid not in ev:
raise Exception("Connected to incorrect network")
+ if hapd:
+ hapd.wait_sta()
dev.request("REMOVE_NETWORK all")
dev.wait_disconnected()
dev.dump_monitor()
+ if hapd:
+ hapd.wait_sta_disconnect()
def interworking_select(dev, bssid, type=None, no_match=False, freq=None):
dev.dump_monitor()
bssid = apdev[0]['bssid']
params = hs20_ap_params()
params['hessid'] = bssid
- hostapd.add_ap(apdev[0], params)
+ hapd = hostapd.add_ap(apdev[0], params)
dev[0].hs20_enable()
id = dev[0].add_cred_values({'realm': "example.com",
'domain': "example.com"})
interworking_select(dev[0], bssid, "home", freq="2412")
interworking_connect(dev[0], bssid, "TTLS")
+ hapd.wait_sta()
logger.info("Verifying GAS query while associated")
dev[0].request("FETCH_ANQP")
bssid = apdev[0]['bssid']
params = hs20_ap_params()
params['hessid'] = bssid
- hostapd.add_ap(apdev[0], params)
+ hapd = hostapd.add_ap(apdev[0], params)
bssid2 = apdev[1]['bssid']
params = hs20_ap_params()
'domain': "example.com"})
interworking_select(dev[0], bssid, "home", freq="2412")
interworking_connect(dev[0], bssid, "TTLS")
+ hapd.wait_sta()
dev[0].dump_monitor()
logger.info("Verifying GAS query with same AP while associated")
check_eap_capa(dev[0], "MSCHAPV2")
bssid = apdev[0]['bssid']
params = hs20_ap_params()
- hostapd.add_ap(apdev[0], params)
+ hapd = hostapd.add_ap(apdev[0], params)
dev[0].hs20_enable()
dev[0].scan_for_bss(bssid, freq="2412")
check_conn_capab_selection(dev[0], "roaming", True)
logger.info("Verify that req_conn_capab does not prevent connection if no other network is available")
- check_auto_select(dev[0], bssid)
+ check_auto_select(dev[0], bssid, hapd=hapd)
logger.info("Additional req_conn_capab checks")
params = hs20_ap_params()
params['domain_name'] = "roaming.example.org"
params['hs20_conn_capab'] = ["1:0:2", "6:22:1", "17:5060:0", "50:0:1"]
- hostapd.add_ap(apdev[0], params)
+ hapd = hostapd.add_ap(apdev[0], params)
bssid2 = apdev[1]['bssid']
params = hs20_ap_params(ssid="test-hs20-b")
params['domain_name'] = "roaming.example.net"
- hostapd.add_ap(apdev[1], params)
+ hapd2 = hostapd.add_ap(apdev[1], params)
values = default_cred()
values['roaming_partner'] = "roaming.example.net,1,127,*"
id = dev[0].add_cred_values(values)
- check_auto_select(dev[0], bssid2)
+ check_auto_select(dev[0], bssid2, hapd=hapd2)
dev[0].set_cred(id, "req_conn_capab", "50")
- check_auto_select(dev[0], bssid)
+ check_auto_select(dev[0], bssid, hapd=hapd)
dev[0].remove_cred(id)
id = dev[0].add_cred_values(values)
dev[0].set_cred(id, "req_conn_capab", "51")
- check_auto_select(dev[0], bssid2)
+ check_auto_select(dev[0], bssid2, hapd=hapd2)
def check_bandwidth_selection(dev, type, below):
dev.request("INTERWORKING_SELECT freq=2412")
check_eap_capa(dev[0], "MSCHAPV2")
bssid = apdev[0]['bssid']
params = hs20_ap_params()
- hostapd.add_ap(apdev[0], params)
+ hapd = hostapd.add_ap(apdev[0], params)
dev[0].hs20_enable()
dev[0].scan_for_bss(bssid, freq="2412")
values = bw_cred(domain="example.com", dl_home=5491, ul_home=59)
id = dev[0].add_cred_values(values)
check_bandwidth_selection(dev[0], "home", True)
- check_auto_select(dev[0], bssid)
+ check_auto_select(dev[0], bssid, hapd=hapd)
bssid2 = apdev[1]['bssid']
params = hs20_ap_params(ssid="test-hs20-b")
params['hs20_wan_metrics'] = "01:8000:1000:1:1:3000"
- hostapd.add_ap(apdev[1], params)
+ hapd2 = hostapd.add_ap(apdev[1], params)
- check_auto_select(dev[0], bssid2)
+ check_auto_select(dev[0], bssid2, hapd=hapd2)
def test_ap_hs20_min_bandwidth_home2(dev, apdev):
"""Hotspot 2.0 network selection with min bandwidth - special cases"""
hapd_global.remove(apdev[0]['ifname'])
params = hs20_ap_params()
- hostapd.add_ap(apdev[0], params)
+ hapd = hostapd.add_ap(apdev[0], params)
dev[0].hs20_enable()
dev[0].scan_for_bss(bssid, freq="2412")
values = bw_cred(domain="example.com", dl_home=5491, ul_home=59)
id = dev[0].add_cred_values(values)
check_bandwidth_selection(dev[0], "home", True)
- check_auto_select(dev[0], bssid)
+ check_auto_select(dev[0], bssid, hapd=hapd)
bssid2 = apdev[1]['bssid']
params = hs20_ap_params(ssid="test-hs20-b")
params['hs20_wan_metrics'] = "01:8000:1000:1:1:3000"
- hostapd.add_ap(apdev[1], params)
+ hapd2 = hostapd.add_ap(apdev[1], params)
- check_auto_select(dev[0], bssid2)
+ check_auto_select(dev[0], bssid2, hapd=hapd2)
dev[0].flush_scan_cache()
check_eap_capa(dev[0], "MSCHAPV2")
bssid = apdev[0]['bssid']
params = hs20_ap_params()
- hostapd.add_ap(apdev[0], params)
+ hapd = hostapd.add_ap(apdev[0], params)
dev[0].hs20_enable()
dev[0].scan_for_bss(bssid, freq="2412")
values = bw_cred(domain="example.org", dl_roaming=5491, ul_roaming=59)
id = dev[0].add_cred_values(values)
check_bandwidth_selection(dev[0], "roaming", True)
- check_auto_select(dev[0], bssid)
+ check_auto_select(dev[0], bssid, hapd=hapd)
bssid2 = apdev[1]['bssid']
params = hs20_ap_params(ssid="test-hs20-b")
params['hs20_wan_metrics'] = "01:8000:1000:1:1:3000"
- hostapd.add_ap(apdev[1], params)
+ hapd2 = hostapd.add_ap(apdev[1], params)
- check_auto_select(dev[0], bssid2)
+ check_auto_select(dev[0], bssid2, hapd=hapd2)
def test_ap_hs20_min_bandwidth_and_roaming_partner_preference(dev, apdev):
"""Hotspot 2.0 and minimum bandwidth with roaming partner preference"""
params = hs20_ap_params()
params['domain_name'] = "roaming.example.org"
params['hs20_wan_metrics'] = "01:8000:1000:1:1:3000"
- hostapd.add_ap(apdev[0], params)
+ hapd = hostapd.add_ap(apdev[0], params)
bssid2 = apdev[1]['bssid']
params = hs20_ap_params(ssid="test-hs20-b")
params['domain_name'] = "roaming.example.net"
- hostapd.add_ap(apdev[1], params)
+ hapd2 = hostapd.add_ap(apdev[1], params)
values = default_cred()
values['roaming_partner'] = "roaming.example.net,1,127,*"
id = dev[0].add_cred_values(values)
- check_auto_select(dev[0], bssid2)
+ check_auto_select(dev[0], bssid2, hapd=hapd2)
dev[0].set_cred(id, "min_dl_bandwidth_roaming", "6000")
- check_auto_select(dev[0], bssid)
+ check_auto_select(dev[0], bssid, hapd=hapd)
dev[0].set_cred(id, "min_dl_bandwidth_roaming", "10000")
- check_auto_select(dev[0], bssid2)
+ check_auto_select(dev[0], bssid2, hapd=hapd2)
def test_ap_hs20_min_bandwidth_no_wan_metrics(dev, apdev):
"""Hotspot 2.0 network selection with min bandwidth but no WAN Metrics"""
bssid=apdev[0]['bssid'])
dev[1].connect(ssid, psk=passphrase, scan_freq="2412",
bssid=apdev[0]['bssid'])
- hapd.wait_sta()
- hapd.wait_sta()
+ hapd.wait_sta(wait_4way_hs=True)
+ hapd.wait_sta(wait_4way_hs=True)
addr0 = dev[0].p2p_interface_addr()
hwsim_utils.test_connectivity_sta(dev[0], dev[1])
err, macs1 = hapd.cmd_execute(['brctl', 'showmacs', 'ap-br0'])
params['qos_map_set'] = '53,2,22,6,8,15,0,7,255,255,16,31,32,39,255,255,40,47,48,55'
hapd = hostapd.add_ap(apdev[0], params)
dev[0].connect(ssid, key_mgmt="NONE", scan_freq="2412")
+ hapd.wait_sta()
time.sleep(0.1)
addr = dev[0].p2p_interface_addr()
dev[0].request("DATA_TEST_CONFIG 1")
params = {"ssid": ssid}
hapd = hostapd.add_ap(apdev[0], params)
dev[0].connect(ssid, key_mgmt="NONE", scan_freq="2412")
+ hapd.wait_sta()
+ time.sleep(0.1)
addr = dev[0].p2p_interface_addr()
dev[0].request("DATA_TEST_CONFIG 1")
hapd.request("DATA_TEST_CONFIG 1")
hapd = hostapd.add_ap(apdev[0], params)
dev[0].connect(ssid, key_mgmt="NONE", scan_freq="2412")
addr = dev[0].p2p_interface_addr()
+ hapd.wait_sta()
+ time.sleep(0.1)
dev[0].request("DATA_TEST_CONFIG 1")
hapd.request("DATA_TEST_CONFIG 1")
Wlantest.setup(hapd)
raise Exception("SET_QOS_MAP_SET accepted during forced driver failure")
dev[0].connect(ssid, key_mgmt="NONE", scan_freq="2412")
+ hapd.wait_sta()
+ time.sleep(0.1)
with alloc_fail(hapd, 1,
"wpabuf_alloc;hostapd_ctrl_iface_send_qos_map_conf"):
if "FAIL" not in hapd.request("SEND_QOS_MAP_CONF " + dev[0].own_addr()):
time.sleep(.1)
dev[0].connect("test-vlan", psk="12345678x", scan_freq="2412")
+ hapd.wait_sta()
# inject some traffic
sa = hapd.own_addr()
wpas.interface_remove("wlan5")
+ for j in range(3):
+ ev = hapd.wait_event(["WPS-ENROLLEE-SEEN"], timeout=1)
+ if ev:
+ logger.info("Ignored extra event at the end: " + ev)
hapd.dump_monitor()
logger.info("Seen UUIDs: " + str(uuid))
dev[0].request("REMOVE_NETWORK all")
dev[0].dump_monitor()
+ hapd2.dump_monitor()
tests = ["eap_sim_msg_add_encr_start;eap_sim_response_notification",
"aes_128_cbc_encrypt;eap_sim_response_notification"]
for func in tests:
eap="SIM", identity="1232010000000000",
phase1="result_ind=1",
password="90dca4eda45b53cf0f12d7c9c3bc6a89:cb9cccc4b9258e6dca4760379fb82581")
+ hapd2.wait_sta()
dev[0].request("REAUTHENTICATE")
ev = dev[0].wait_event(["CTRL-EVENT-EAP-METHOD"], timeout=5)
if ev is None:
wait_fail_trigger(dev[0], "GET_FAIL")
dev[0].request("REMOVE_NETWORK all")
dev[0].dump_monitor()
+ hapd2.wait_sta_disconnect()
+ hapd2.dump_monitor()
tests = ["eap_sim_parse_encr;eap_sim_process_notification_reauth"]
for func in tests:
with alloc_fail(dev[0], 1, func):
eap="SIM", identity="1232010000000000",
phase1="result_ind=1",
password="90dca4eda45b53cf0f12d7c9c3bc6a89:cb9cccc4b9258e6dca4760379fb82581")
+ hapd2.wait_sta()
dev[0].request("REAUTHENTICATE")
ev = dev[0].wait_event(["CTRL-EVENT-EAP-METHOD"], timeout=5)
if ev is None:
wait_fail_trigger(dev[0], "GET_ALLOC_FAIL")
dev[0].request("REMOVE_NETWORK all")
dev[0].dump_monitor()
+ hapd2.wait_sta_disconnect()
def test_eap_proto_aka_errors(dev, apdev):
"""EAP-AKA protocol tests (error paths)"""
dev[0].wait_disconnected()
dev[0].dump_monitor()
+ hapd2.dump_monitor()
tests = ["eap_sim_msg_add_encr_start;eap_aka_response_notification",
"aes_128_cbc_encrypt;eap_aka_response_notification"]
for func in tests:
eap="AKA", identity="0232010000000000",
phase1="result_ind=1",
password="90dca4eda45b53cf0f12d7c9c3bc6a89:cb9cccc4b9258e6dca4760379fb82581:000000000123")
+ hapd2.wait_sta()
dev[0].request("REAUTHENTICATE")
ev = dev[0].wait_event(["CTRL-EVENT-EAP-METHOD"], timeout=5)
if ev is None:
wait_fail_trigger(dev[0], "GET_FAIL")
dev[0].request("REMOVE_NETWORK all")
dev[0].dump_monitor()
+ hapd2.wait_sta_disconnect()
+ hapd2.dump_monitor()
tests = ["eap_sim_parse_encr;eap_aka_process_notification_reauth"]
for func in tests:
with alloc_fail(dev[0], 1, func):
eap="AKA", identity="0232010000000000",
phase1="result_ind=1",
password="90dca4eda45b53cf0f12d7c9c3bc6a89:cb9cccc4b9258e6dca4760379fb82581:000000000123")
+ hapd2.wait_sta()
dev[0].request("REAUTHENTICATE")
ev = dev[0].wait_event(["CTRL-EVENT-EAP-METHOD"], timeout=5)
if ev is None:
wait_fail_trigger(dev[0], "GET_ALLOC_FAIL")
dev[0].request("REMOVE_NETWORK all")
dev[0].dump_monitor()
+ hapd2.wait_sta_disconnect()
def test_eap_proto_aka_prime_errors(dev, apdev):
"""EAP-AKA' protocol tests (error paths)"""
if pmksa is None:
raise Exception("No PMKSA cache entry created")
+ hapd.wait_sta()
dev[0].request("DISCONNECT")
dev[0].wait_disconnected()
+ hapd.wait_sta_disconnect()
dev[0].dump_monitor()
dev[0].select_network(id, freq=2412)
raise Exception("Connection using PMKSA caching timed out")
if "CTRL-EVENT-EAP-STARTED" in ev:
raise Exception("Unexpected EAP exchange")
+ hapd.wait_sta()
hwsim_utils.test_connectivity(dev[0], hapd)
pmksa2 = dev[0].get_pmksa(bssid)
if pmksa2 is None:
ev = dev[0].wait_event(["CTRL-EVENT-EAP-SUCCESS"], timeout=5)
if ev is None:
raise Exception("EAP authentication did not succeed")
+ ev = hapd.wait_event(["CTRL-EVENT-EAP-SUCCESS2"], timeout=1)
+ if ev is None:
+ raise Exception("hostapd did not report EAP-Success on reauth")
time.sleep(0.1)
hwsim_utils.test_connectivity(dev[0], hapd)
if "ffee" not in res1:
raise Exception("FILS Cache Identifier not seen in PMKSA cache entry")
+ hapd.wait_sta()
dev[0].request("DISCONNECT")
dev[0].wait_disconnected()
hapd_as.disable()
sta = sta1.get_instance()
sta.request("DISCONNECT")
sta.wait_disconnected()
+ ap1.hapd.wait_sta_disconnect()
req = "FST-MANAGER SESSION_RESPOND %s reject" % ev['id']
s = ap1.grequest(req)
if not s.startswith("FAIL"):
dev[2].dump_monitor()
dev[2].request("WPS_PIN any " + pin)
dev[2].wait_connected(timeout=30)
+ dev[0].wait_sta(addr=dev[2].own_addr())
status = dev[2].get_status()
if status['wpa_state'] != 'COMPLETED':
raise Exception("Not fully connected")
dev[2].request("DISCONNECT")
+ dev[2].wait_disconnected()
+ dev[0].wait_sta_disconnect()
logger.info("Connect legacy non-WPS client")
dev[2].request("FLUSH")
dev[2].connect(ssid=res['ssid'], psk=res['passphrase'], proto='RSN',
key_mgmt='WPA-PSK', pairwise='CCMP', group='CCMP',
scan_freq=res['freq'])
+ dev[0].wait_sta(addr=dev[2].own_addr())
hwsim_utils.test_connectivity_p2p_sta(dev[1], dev[2])
dev[2].request("DISCONNECT")
pin = dev[2].wps_read_pin()
dev[0].p2p_go_authorize_client(pin)
c_res = dev[2].p2p_connect_group(dev[0].p2p_dev_addr(), pin, timeout=60)
+ dev[0].wait_sta(dev[2].p2p_interface_addr())
check_grpform_results(i_res, c_res)
if r_res['psk'] == c_res['psk']:
raise Exception("Timeout on group restart")
dev[i].group_form_result(ev)
+ dev[0].wait_sta()
+ dev[0].wait_sta()
+
logger.info("Leave persistent group and rejoin it")
dev[2].remove_group()
ev = dev[2].wait_global_event(["P2P-GROUP-REMOVED"], timeout=3)
if ev is None:
raise Exception("Group removal event timed out")
+ dev[0].wait_sta_disconnect()
if not dev[2].discover_peer(addr0, social=True):
raise Exception("Peer " + addr0 + " not found")
dev[2].dump_monitor()
cli_res = dev[2].group_form_result(ev)
if not cli_res['persistent']:
raise Exception("Persistent group not restarted as persistent (cli)")
+ dev[0].wait_sta(addr=dev[2].p2p_interface_addr())
hwsim_utils.test_connectivity_p2p(dev[1], dev[2])
logger.info("Remove one of the clients from the group without removing persistent group information for the client")
eap="GPSK", identity="gpsk user",
password="abcdefghijklmnop0123456789abcdef",
scan_freq="2412")
+ hapd.wait_sta()
res1 = dev[0].request("PMKSA_GET %d" % id)
logger.info("PMKSA_GET: " + res1)
dev[0].wait_disconnected()
dev[0].dump_monitor()
dev[0].request("PMKSA_FLUSH")
+ hapd.wait_sta_disconnect()
id = dev[0].connect("test-pmksa-cache", proto="RSN", key_mgmt="FT-EAP",
eap="GPSK", identity="gpsk user",
val = dev[0].get_status_field("sae_pk")
if val != str(expected):
raise Exception("Unexpected sae_pk=%d result %s" % (sae_pk, val))
+ hapd.wait_sta()
dev[0].request("REMOVE_NETWORK *")
dev[0].wait_disconnected()
dev[0].dump_monitor()
+ hapd.wait_sta_disconnect()
def test_sae_pk_not_on_ap(dev, apdev):
"""SAE-PK password, but no PK on AP"""
dev[0].connect("test-wnm-rsn", psk="12345678", ieee80211w="2",
key_mgmt="WPA-PSK-SHA256", proto="WPA2", scan_freq="2412")
addr = dev[0].p2p_interface_addr()
+ hapd.wait_sta(wait_4way_hs=True)
hapd.request("ESS_DISASSOC " + addr + " 10 http://example.com/session-info")
ev = dev[0].wait_event(["ESS-DISASSOC-IMMINENT"])
if ev is None:
dev[0].connect("test-wnm-rsn", psk="12345678", ieee80211w="2", ocv="1",
key_mgmt="WPA-PSK-SHA256", proto="WPA2", scan_freq="2412")
+ hapd.wait_sta()
# Failed to allocate buffer for OCI element in WNM-Sleep Mode frame
with alloc_fail(hapd, 2, "ieee802_11_send_wnmsleep_resp"):
if "OK" not in dev[0].request("WNM_SLEEP enter"):
if expect_failure:
return None
raise Exception("Group formation timed out")
- self.dump_monitor()
return self.group_form_result(ev, expect_failure, go_neg_res)
def p2p_go_neg_init(self, peer, pin, method, timeout=0, go_intent=None,
vals['kdk'] = kdk
return vals
return None
+
+ def wait_sta(self, addr=None, timeout=2, wait_4way_hs=False):
+ ev = self.wait_group_event(["AP-STA-CONNECT"], timeout=timeout)
+ if ev is None:
+ ev = self.wait_event(["AP-STA-CONNECT"], timeout=timeout)
+ if ev is None:
+ raise Exception("AP did not report STA connection")
+ if addr and addr not in ev:
+ raise Exception("Unexpected STA address in connection event: " + ev)
+ if wait_4way_hs:
+ ev2 = self.wait_group_event(["EAPOL-4WAY-HS-COMPLETED"],
+ timeout=timeout)
+ if ev2 is None:
+ raise Exception("AP did not report 4-way handshake completion")
+ if addr and addr not in ev2:
+ raise Exception("Unexpected STA address in 4-way handshake completion event: " + ev2)
+ return ev
+
+ def wait_sta_disconnect(self, addr=None, timeout=2):
+ ev = self.wait_group_event(["AP-STA-DISCONNECT"], timeout=timeout)
+ if ev is None:
+ ev = self.wait_event(["AP-STA-CONNECT"], timeout=timeout)
+ if ev is None:
+ raise Exception("AP did not report STA disconnection")
+ if addr and addr not in ev:
+ raise Exception("Unexpected STA address in disconnection event: " + ev)
+ return ev