def run_connectivity_test(dev1, dev2, tos, dev1group=False, dev2group=False,
ifname1=None, ifname2=None, config=True, timeout=5,
- multicast_to_unicast=False):
+ multicast_to_unicast=False, broadcast=True):
addr1 = dev1.own_addr()
if not dev1group and isinstance(dev1, WpaSupplicant):
addr1 = dev1.get_driver_status_field('addr')
if "DATA-TEST-RX {} {}".format(addr2, addr1) not in ev:
raise Exception("Unexpected dev1->dev2 unicast data result")
- cmd = "DATA_TEST_TX ff:ff:ff:ff:ff:ff {} {}".format(addr1, tos)
- for i in xrange(broadcast_retry_c):
- try:
- if dev1group:
- dev1.group_request(cmd)
- else:
- dev1.request(cmd)
- if dev2group:
- ev = dev2.wait_group_event(["DATA-TEST-RX"],
- timeout=timeout)
- else:
- ev = dev2.wait_event(["DATA-TEST-RX"], timeout=timeout)
- if ev is None:
- raise Exception("dev1->dev2 broadcast data delivery failed")
- if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr1) not in ev:
- raise Exception("Unexpected dev1->dev2 broadcast data result")
- break
- except Exception as e:
- if i == broadcast_retry_c - 1:
- raise
+ if broadcast:
+ cmd = "DATA_TEST_TX ff:ff:ff:ff:ff:ff {} {}".format(addr1, tos)
+ for i in xrange(broadcast_retry_c):
+ try:
+ if dev1group:
+ dev1.group_request(cmd)
+ else:
+ dev1.request(cmd)
+ if dev2group:
+ ev = dev2.wait_group_event(["DATA-TEST-RX"],
+ timeout=timeout)
+ else:
+ ev = dev2.wait_event(["DATA-TEST-RX"], timeout=timeout)
+ if ev is None:
+ raise Exception("dev1->dev2 broadcast data delivery failed")
+ if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr1) not in ev:
+ raise Exception("Unexpected dev1->dev2 broadcast data result")
+ break
+ except Exception as e:
+ if i == broadcast_retry_c - 1:
+ raise
cmd = "DATA_TEST_TX {} {} {}".format(addr1, addr2, tos)
if dev2group:
if "DATA-TEST-RX {} {}".format(addr1, addr2) not in ev:
raise Exception("Unexpected dev2->dev1 unicast data result")
- cmd = "DATA_TEST_TX ff:ff:ff:ff:ff:ff {} {}".format(addr2, tos)
- for i in xrange(broadcast_retry_c):
- try:
- if dev2group:
- dev2.group_request(cmd)
- else:
- dev2.request(cmd)
- if dev1group:
- ev = dev1.wait_group_event(["DATA-TEST-RX"],
- timeout=timeout)
- else:
- ev = dev1.wait_event(["DATA-TEST-RX"], timeout=timeout)
- if ev is None:
- raise Exception("dev2->dev1 broadcast data delivery failed")
- if multicast_to_unicast:
- if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr2) in ev:
- raise Exception("Unexpected dev2->dev1 broadcast data result: multicast to unicast conversion missing")
- if "DATA-TEST-RX {} {}".format(addr1, addr2) not in ev:
- raise Exception("Unexpected dev2->dev1 broadcast data result (multicast to unicast enabled)")
- else:
- if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr2) not in ev:
- raise Exception("Unexpected dev2->dev1 broadcast data result")
- break
- except Exception as e:
- if i == broadcast_retry_c - 1:
- raise
+ if broadcast:
+ cmd = "DATA_TEST_TX ff:ff:ff:ff:ff:ff {} {}".format(addr2, tos)
+ for i in xrange(broadcast_retry_c):
+ try:
+ if dev2group:
+ dev2.group_request(cmd)
+ else:
+ dev2.request(cmd)
+ if dev1group:
+ ev = dev1.wait_group_event(["DATA-TEST-RX"],
+ timeout=timeout)
+ else:
+ ev = dev1.wait_event(["DATA-TEST-RX"], timeout=timeout)
+ if ev is None:
+ raise Exception("dev2->dev1 broadcast data delivery failed")
+ if multicast_to_unicast:
+ if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr2) in ev:
+ raise Exception("Unexpected dev2->dev1 broadcast data result: multicast to unicast conversion missing")
+ if "DATA-TEST-RX {} {}".format(addr1, addr2) not in ev:
+ raise Exception("Unexpected dev2->dev1 broadcast data result (multicast to unicast enabled)")
+ else:
+ if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr2) not in ev:
+ raise Exception("Unexpected dev2->dev1 broadcast data result")
+ break
+ except Exception as e:
+ if i == broadcast_retry_c - 1:
+ raise
finally:
if config:
if dev1group:
def test_connectivity(dev1, dev2, dscp=None, tos=None, max_tries=1,
dev1group=False, dev2group=False,
ifname1=None, ifname2=None, config=True, timeout=5,
- multicast_to_unicast=False, success_expected=True):
+ multicast_to_unicast=False, success_expected=True,
+ broadcast=True):
if dscp:
tos = dscp << 2
if not tos:
run_connectivity_test(dev1, dev2, tos, dev1group, dev2group,
ifname1, ifname2, config=config,
timeout=timeout,
- multicast_to_unicast=multicast_to_unicast)
+ multicast_to_unicast=multicast_to_unicast,
+ broadcast=broadcast)
success = True
break
except Exception, e:
a = int(after[i], 16)
if a < b:
raise Exception("RX counter decreased: idx=%d before=%d after=%d" % (i, b, a))
+
+def test_ap_wpa2_delayed_m1_m3_zero_tk(dev, apdev):
+ """Delayed M1+M3 retransmission and zero TK"""
+ params = hostapd.wpa2_params(ssid="test-wpa2-psk", passphrase="12345678")
+ hapd = hostapd.add_ap(apdev[0], params)
+
+ Wlantest.setup(hapd)
+ wt = Wlantest()
+ wt.flush()
+ wt.add_passphrase("12345678")
+
+ dev[0].connect("test-wpa2-psk", psk="12345678", scan_freq="2412")
+
+ hwsim_utils.test_connectivity(dev[0], hapd)
+ addr = dev[0].own_addr()
+ if "OK" not in hapd.request("RESEND_M1 " + addr + " change-anonce"):
+ raise Exception("RESEND_M1 failed")
+ if "OK" not in hapd.request("RESEND_M1 " + addr):
+ raise Exception("RESEND_M1 failed")
+ if "OK" not in hapd.request("RESEND_M3 " + addr):
+ raise Exception("RESEND_M3 failed")
+
+ if "OK" not in hapd.request("SET_KEY 3 %s %d %d %s %s" % (addr, 0, 1, 6*"00", 16*"00")):
+ raise Exception("SET_KEY failed")
+ time.sleep(0.1)
+ hwsim_utils.test_connectivity(dev[0], hapd, timeout=1, broadcast=False,
+ success_expected=False)
+ dev[0].request("DISCONNECT")
+ dev[0].wait_disconnected()