from wpasupplicant import WpaSupplicant
def run_connectivity_test(dev1, dev2, tos, dev1group=False, dev2group=False,
- ifname1=None, ifname2=None, config=True):
+ ifname1=None, ifname2=None, config=True, timeout=5):
addr1 = dev1.own_addr()
if not dev1group and isinstance(dev1, WpaSupplicant):
addr1 = dev1.get_driver_status_field('addr')
else:
dev1.request(cmd)
if dev2group:
- ev = dev2.wait_group_event(["DATA-TEST-RX"], timeout=5)
+ ev = dev2.wait_group_event(["DATA-TEST-RX"], timeout=timeout)
else:
- ev = dev2.wait_event(["DATA-TEST-RX"], timeout=5)
+ ev = dev2.wait_event(["DATA-TEST-RX"], timeout=timeout)
if ev is None:
raise Exception("dev1->dev2 unicast data delivery failed")
if "DATA-TEST-RX {} {}".format(addr2, addr1) not in ev:
else:
dev1.request(cmd)
if dev2group:
- ev = dev2.wait_group_event(["DATA-TEST-RX"], timeout=5)
+ ev = dev2.wait_group_event(["DATA-TEST-RX"], timeout=timeout)
else:
- ev = dev2.wait_event(["DATA-TEST-RX"], timeout=5)
+ ev = dev2.wait_event(["DATA-TEST-RX"], timeout=timeout)
if ev is None:
raise Exception("dev1->dev2 broadcast data delivery failed")
if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr1) not in ev:
else:
dev2.request(cmd)
if dev1group:
- ev = dev1.wait_group_event(["DATA-TEST-RX"], timeout=5)
+ ev = dev1.wait_group_event(["DATA-TEST-RX"], timeout=timeout)
else:
- ev = dev1.wait_event(["DATA-TEST-RX"], timeout=5)
+ ev = dev1.wait_event(["DATA-TEST-RX"], timeout=timeout)
if ev is None:
raise Exception("dev2->dev1 unicast data delivery failed")
if "DATA-TEST-RX {} {}".format(addr1, addr2) not in ev:
else:
dev2.request(cmd)
if dev1group:
- ev = dev1.wait_group_event(["DATA-TEST-RX"], timeout=5)
+ ev = dev1.wait_group_event(["DATA-TEST-RX"], timeout=timeout)
else:
- ev = dev1.wait_event(["DATA-TEST-RX"], timeout=5)
+ ev = dev1.wait_event(["DATA-TEST-RX"], timeout=timeout)
if ev is None:
raise Exception("dev2->dev1 broadcast data delivery failed")
if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr2) not in ev:
def test_connectivity(dev1, dev2, dscp=None, tos=None, max_tries=1,
dev1group=False, dev2group=False,
- ifname1=None, ifname2=None, config=True):
+ ifname1=None, ifname2=None, config=True, timeout=5):
if dscp:
tos = dscp << 2
if not tos:
for i in range(0, max_tries):
try:
run_connectivity_test(dev1, dev2, tos, dev1group, dev2group,
- ifname1, ifname2, config=config)
+ ifname1, ifname2, config=config,
+ timeout=timeout)
success = True
break
except Exception, e:
# Test connectivity 0->1 and 1->0
hwsim_utils.test_connectivity(dev[0], dev[1])
+
+def test_wpas_mesh_password_mismatch(dev, apdev):
+ """Mesh network and one device with mismatching password"""
+ check_mesh_support(dev[0], secure=True)
+ dev[0].request("SET sae_groups ")
+ id = add_mesh_secure_net(dev[0])
+ dev[0].mesh_group_add(id)
+
+ dev[1].request("SET sae_groups ")
+ id = add_mesh_secure_net(dev[1])
+ dev[1].mesh_group_add(id)
+
+ dev[2].request("SET sae_groups ")
+ id = add_mesh_secure_net(dev[2])
+ dev[2].set_network_quoted(id, "psk", "wrong password")
+ dev[2].mesh_group_add(id)
+
+ # The two peers with matching password need to be able to connect
+ check_mesh_group_added(dev[0])
+ check_mesh_group_added(dev[1])
+ check_mesh_peer_connected(dev[0])
+ check_mesh_peer_connected(dev[1])
+
+ ev = dev[2].wait_event(["MESH-SAE-AUTH-FAILURE"], timeout=20)
+ if ev is None:
+ raise Exception("dev2 did not report auth failure (1)")
+ ev = dev[2].wait_event(["MESH-SAE-AUTH-FAILURE"], timeout=20)
+ if ev is None:
+ raise Exception("dev2 did not report auth failure (2)")
+
+ ev = dev[0].wait_event(["MESH-SAE-AUTH-FAILURE"], timeout=1)
+ if ev is None:
+ raise Exception("dev0 did not report auth failure")
+ if "addr=" + dev[2].own_addr() not in ev:
+ raise Exception("Unexpected peer address in dev0 event: " + ev)
+
+ ev = dev[1].wait_event(["MESH-SAE-AUTH-FAILURE"], timeout=1)
+ if ev is None:
+ raise Exception("dev1 did not report auth failure")
+ if "addr=" + dev[2].own_addr() not in ev:
+ raise Exception("Unexpected peer address in dev1 event: " + ev)
+
+ hwsim_utils.test_connectivity(dev[0], dev[1])
+
+ for i in range(2):
+ try:
+ hwsim_utils.test_connectivity(dev[i], dev[2], timeout=1)
+ raise Exception("Data connectivity test passed unexpectedly")
+ except Exception, e:
+ if "data delivery failed" not in str(e):
+ raise
+
+def test_wpas_mesh_password_mismatch_retry(dev, apdev, params):
+ """Mesh password mismatch and retry [long]"""
+ if not params['long']:
+ raise HwsimSkip("Skip test case with long duration due to --long not specified")
+ check_mesh_support(dev[0], secure=True)
+ dev[0].request("SET sae_groups ")
+ id = add_mesh_secure_net(dev[0])
+ dev[0].mesh_group_add(id)
+
+ dev[1].request("SET sae_groups ")
+ id = add_mesh_secure_net(dev[1])
+ dev[1].set_network_quoted(id, "psk", "wrong password")
+ dev[1].mesh_group_add(id)
+
+ # Check for mesh joined
+ check_mesh_group_added(dev[0])
+ check_mesh_group_added(dev[1])
+
+ for i in range(4):
+ ev = dev[0].wait_event(["MESH-SAE-AUTH-FAILURE"], timeout=20)
+ if ev is None:
+ raise Exception("dev0 did not report auth failure (%d)" % i)
+ ev = dev[1].wait_event(["MESH-SAE-AUTH-FAILURE"], timeout=20)
+ if ev is None:
+ raise Exception("dev1 did not report auth failure (%d)" % i)
+
+ ev = dev[0].wait_event(["MESH-SAE-AUTH-BLOCKED"], timeout=10)
+ if ev is None:
+ raise Exception("dev0 did not report auth blocked")
+ ev = dev[1].wait_event(["MESH-SAE-AUTH-BLOCKED"], timeout=10)
+ if ev is None:
+ raise Exception("dev1 did not report auth blocked")