from wpasupplicant import WpaSupplicant
+def config_data_test(dev1, dev2, dev1group, dev2group, ifname1, ifname2):
+ cmd = "DATA_TEST_CONFIG 1"
+ if ifname1:
+ cmd = cmd + " ifname=" + ifname1
+ if dev1group:
+ res = dev1.group_request(cmd)
+ else:
+ res = dev1.request(cmd)
+ if "OK" not in res:
+ raise Exception("Failed to enable data test functionality")
+
+ cmd = "DATA_TEST_CONFIG 1"
+ if ifname2:
+ cmd = cmd + " ifname=" + ifname2
+ if dev2group:
+ res = dev2.group_request(cmd)
+ else:
+ res = dev2.request(cmd)
+ if "OK" not in res:
+ raise Exception("Failed to enable data test functionality")
+
+def run_multicast_connectivity_test(dev1, dev2, tos=None,
+ dev1group=False, dev2group=False,
+ ifname1=None, ifname2=None,
+ config=True, timeout=5,
+ send_len=None, multicast_to_unicast=False,
+ broadcast_retry_c=1):
+ addr1 = dev1.get_addr(dev1group)
+ addr2 = dev2.get_addr(dev2group)
+
+ if config:
+ config_data_test(dev1, dev2, dev1group, dev2group, ifname1, ifname2)
+
+ cmd = "DATA_TEST_TX ff:ff:ff:ff:ff:ff {} {}".format(addr1, tos)
+ if send_len is not None:
+ cmd += " len=" + str(send_len)
+ for i in range(broadcast_retry_c):
+ try:
+ if dev1group:
+ dev1.group_request(cmd)
+ else:
+ dev1.request(cmd)
+ if dev2group:
+ ev = dev2.wait_group_event(["DATA-TEST-RX"],
+ timeout=timeout)
+ else:
+ ev = dev2.wait_event(["DATA-TEST-RX"], timeout=timeout)
+ if ev is None:
+ raise Exception("dev1->dev2 broadcast data delivery failed")
+ if multicast_to_unicast:
+ if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr1) in ev:
+ raise Exception("Unexpected dev1->dev2 broadcast data result: multicast to unicast conversion missing")
+ if "DATA-TEST-RX {} {}".format(addr2, addr1) not in ev:
+ raise Exception("Unexpected dev1->dev2 broadcast data result (multicast to unicast enabled)")
+ else:
+ if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr1) not in ev:
+ raise Exception("Unexpected dev1->dev2 broadcast data result")
+ if send_len is not None:
+ if " len=" + str(send_len) not in ev:
+ raise Exception("Unexpected dev1->dev2 broadcast data length")
+ else:
+ if " len=" in ev:
+ raise Exception("Unexpected dev1->dev2 broadcast data length")
+ break
+ except Exception as e:
+ if i == broadcast_retry_c - 1:
+ raise
+
def run_connectivity_test(dev1, dev2, tos, dev1group=False, dev2group=False,
ifname1=None, ifname2=None, config=True, timeout=5,
multicast_to_unicast=False, broadcast=True,
send_len=None):
- addr1 = dev1.own_addr()
- if not dev1group and isinstance(dev1, WpaSupplicant):
- addr = dev1.get_driver_status_field('addr')
- if addr:
- addr1 = addr
-
- addr2 = dev2.own_addr()
- if not dev2group and isinstance(dev2, WpaSupplicant):
- addr = dev2.get_driver_status_field('addr')
- if addr:
- addr2 = addr
+ addr1 = dev1.get_addr(dev1group)
+ addr2 = dev2.get_addr(dev2group)
dev1.dump_monitor()
dev2.dump_monitor()
try:
if config:
- cmd = "DATA_TEST_CONFIG 1"
- if ifname1:
- cmd = cmd + " ifname=" + ifname1
- if dev1group:
- res = dev1.group_request(cmd)
- else:
- res = dev1.request(cmd)
- if "OK" not in res:
- raise Exception("Failed to enable data test functionality")
-
- cmd = "DATA_TEST_CONFIG 1"
- if ifname2:
- cmd = cmd + " ifname=" + ifname2
- if dev2group:
- res = dev2.group_request(cmd)
- else:
- res = dev2.request(cmd)
- if "OK" not in res:
- raise Exception("Failed to enable data test functionality")
+ config_data_test(dev1, dev2, dev1group, dev2group, ifname1, ifname2)
cmd = "DATA_TEST_TX {} {} {}".format(addr2, addr1, tos)
if send_len is not None:
raise Exception("Unexpected dev1->dev2 unicast data length")
if broadcast:
- cmd = "DATA_TEST_TX ff:ff:ff:ff:ff:ff {} {}".format(addr1, tos)
- if send_len is not None:
- cmd += " len=" + str(send_len)
- for i in range(broadcast_retry_c):
- try:
- if dev1group:
- dev1.group_request(cmd)
- else:
- dev1.request(cmd)
- if dev2group:
- ev = dev2.wait_group_event(["DATA-TEST-RX"],
- timeout=timeout)
- else:
- ev = dev2.wait_event(["DATA-TEST-RX"], timeout=timeout)
- if ev is None:
- raise Exception("dev1->dev2 broadcast data delivery failed")
- if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr1) not in ev:
- raise Exception("Unexpected dev1->dev2 broadcast data result")
- if send_len is not None:
- if " len=" + str(send_len) not in ev:
- raise Exception("Unexpected dev1->dev2 broadcast data length")
- else:
- if " len=" in ev:
- raise Exception("Unexpected dev1->dev2 broadcast data length")
- break
- except Exception as e:
- if i == broadcast_retry_c - 1:
- raise
+ run_multicast_connectivity_test(dev1, dev2, tos,
+ dev1group, dev2group,
+ ifname1, ifname2, False, timeout,
+ send_len, False, broadcast_retry_c)
cmd = "DATA_TEST_TX {} {} {}".format(addr1, addr2, tos)
if send_len is not None:
raise Exception("Unexpected dev2->dev1 unicast data length")
if broadcast:
- cmd = "DATA_TEST_TX ff:ff:ff:ff:ff:ff {} {}".format(addr2, tos)
- if send_len is not None:
- cmd += " len=" + str(send_len)
- for i in range(broadcast_retry_c):
- try:
- if dev2group:
- dev2.group_request(cmd)
- else:
- dev2.request(cmd)
- if dev1group:
- ev = dev1.wait_group_event(["DATA-TEST-RX"],
- timeout=timeout)
- else:
- ev = dev1.wait_event(["DATA-TEST-RX"], timeout=timeout)
- if ev is None:
- raise Exception("dev2->dev1 broadcast data delivery failed")
- if multicast_to_unicast:
- if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr2) in ev:
- raise Exception("Unexpected dev2->dev1 broadcast data result: multicast to unicast conversion missing")
- if "DATA-TEST-RX {} {}".format(addr1, addr2) not in ev:
- raise Exception("Unexpected dev2->dev1 broadcast data result (multicast to unicast enabled)")
- else:
- if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr2) not in ev:
- raise Exception("Unexpected dev2->dev1 broadcast data result")
- if send_len is not None:
- if " len=" + str(send_len) not in ev:
- raise Exception("Unexpected dev2->dev1 broadcast data length")
- else:
- if " len=" in ev:
- raise Exception("Unexpected dev2->dev1 broadcast data length")
- break
- except Exception as e:
- if i == broadcast_retry_c - 1:
- raise
+ run_multicast_connectivity_test(dev2, dev1, tos,
+ dev2group, dev1group,
+ ifname2, ifname1, False, timeout,
+ send_len, multicast_to_unicast,
+ broadcast_retry_c)
+
finally:
if config:
if dev1group: