]> git.ipfire.org Git - thirdparty/hostap.git/commitdiff
tests: Factor out multicast connectivity check
authorThomas Pedersen <thomas@adapt-ip.com>
Wed, 5 Feb 2020 07:13:48 +0000 (23:13 -0800)
committerJouni Malinen <j@w1.fi>
Sat, 15 Feb 2020 16:36:29 +0000 (18:36 +0200)
A test may want to check multicast connectivity independent of unicast
or check multicast without exercising unicast first. Factor out the
multicast connectivity check code into its own function.

Signed-off-by: Thomas Pedersen <thomas@adapt-ip.com>
tests/hwsim/hostapd.py
tests/hwsim/hwsim_utils.py
tests/hwsim/wpasupplicant.py

index 67e8a7fb8cdcfe2721d49036d29c828a59d5111b..fac287e99978ebd9608c863dd90b8c972a60bc5b 100644 (file)
@@ -179,6 +179,9 @@ class Hostapd:
             self.bssid = self.get_status_field('bssid[%d]' % self.bssidx)
         return self.bssid
 
+    def get_addr(self, group=False):
+        return self.own_addr()
+
     def request(self, cmd):
         logger.debug(self.dbg + ": CTRL: " + cmd)
         return self.ctrl.request(cmd)
index dccdefc71c563f01d8812ec528ad9dfa392d951f..eb312bf96b2b8d14a51a8a049259738a4b87d2c1 100644 (file)
@@ -11,21 +11,80 @@ logger = logging.getLogger()
 
 from wpasupplicant import WpaSupplicant
 
+def config_data_test(dev1, dev2, dev1group, dev2group, ifname1, ifname2):
+    cmd = "DATA_TEST_CONFIG 1"
+    if ifname1:
+        cmd = cmd + " ifname=" + ifname1
+    if dev1group:
+        res = dev1.group_request(cmd)
+    else:
+        res = dev1.request(cmd)
+    if "OK" not in res:
+        raise Exception("Failed to enable data test functionality")
+
+    cmd = "DATA_TEST_CONFIG 1"
+    if ifname2:
+        cmd = cmd + " ifname=" + ifname2
+    if dev2group:
+        res = dev2.group_request(cmd)
+    else:
+        res = dev2.request(cmd)
+    if "OK" not in res:
+        raise Exception("Failed to enable data test functionality")
+
+def run_multicast_connectivity_test(dev1, dev2, tos=None,
+                                    dev1group=False, dev2group=False,
+                                    ifname1=None, ifname2=None,
+                                    config=True, timeout=5,
+                                    send_len=None, multicast_to_unicast=False,
+                                    broadcast_retry_c=1):
+    addr1 = dev1.get_addr(dev1group)
+    addr2 = dev2.get_addr(dev2group)
+
+    if config:
+        config_data_test(dev1, dev2, dev1group, dev2group, ifname1, ifname2)
+
+    cmd = "DATA_TEST_TX ff:ff:ff:ff:ff:ff {} {}".format(addr1, tos)
+    if send_len is not None:
+        cmd += " len=" + str(send_len)
+    for i in range(broadcast_retry_c):
+        try:
+            if dev1group:
+                dev1.group_request(cmd)
+            else:
+                dev1.request(cmd)
+            if dev2group:
+                ev = dev2.wait_group_event(["DATA-TEST-RX"],
+                                           timeout=timeout)
+            else:
+                ev = dev2.wait_event(["DATA-TEST-RX"], timeout=timeout)
+            if ev is None:
+                raise Exception("dev1->dev2 broadcast data delivery failed")
+            if multicast_to_unicast:
+                if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr1) in ev:
+                    raise Exception("Unexpected dev1->dev2 broadcast data result: multicast to unicast conversion missing")
+                if "DATA-TEST-RX {} {}".format(addr2, addr1) not in ev:
+                    raise Exception("Unexpected dev1->dev2 broadcast data result (multicast to unicast enabled)")
+            else:
+                if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr1) not in ev:
+                    raise Exception("Unexpected dev1->dev2 broadcast data result")
+            if send_len is not None:
+                if " len=" + str(send_len) not in ev:
+                    raise Exception("Unexpected dev1->dev2 broadcast data length")
+            else:
+                if " len=" in ev:
+                    raise Exception("Unexpected dev1->dev2 broadcast data length")
+            break
+        except Exception as e:
+            if i == broadcast_retry_c - 1:
+                raise
+
 def run_connectivity_test(dev1, dev2, tos, dev1group=False, dev2group=False,
                           ifname1=None, ifname2=None, config=True, timeout=5,
                           multicast_to_unicast=False, broadcast=True,
                           send_len=None):
-    addr1 = dev1.own_addr()
-    if not dev1group and isinstance(dev1, WpaSupplicant):
-        addr = dev1.get_driver_status_field('addr')
-        if addr:
-            addr1 = addr
-
-    addr2 = dev2.own_addr()
-    if not dev2group and isinstance(dev2, WpaSupplicant):
-        addr = dev2.get_driver_status_field('addr')
-        if addr:
-            addr2 = addr
+    addr1 = dev1.get_addr(dev1group)
+    addr2 = dev2.get_addr(dev2group)
 
     dev1.dump_monitor()
     dev2.dump_monitor()
@@ -37,25 +96,7 @@ def run_connectivity_test(dev1, dev2, tos, dev1group=False, dev2group=False,
 
     try:
         if config:
-            cmd = "DATA_TEST_CONFIG 1"
-            if ifname1:
-                cmd = cmd + " ifname=" + ifname1
-            if dev1group:
-                res = dev1.group_request(cmd)
-            else:
-                res = dev1.request(cmd)
-            if "OK" not in res:
-                raise Exception("Failed to enable data test functionality")
-
-            cmd = "DATA_TEST_CONFIG 1"
-            if ifname2:
-                cmd = cmd + " ifname=" + ifname2
-            if dev2group:
-                res = dev2.group_request(cmd)
-            else:
-                res = dev2.request(cmd)
-            if "OK" not in res:
-                raise Exception("Failed to enable data test functionality")
+            config_data_test(dev1, dev2, dev1group, dev2group, ifname1, ifname2)
 
         cmd = "DATA_TEST_TX {} {} {}".format(addr2, addr1, tos)
         if send_len is not None:
@@ -80,34 +121,10 @@ def run_connectivity_test(dev1, dev2, tos, dev1group=False, dev2group=False,
                 raise Exception("Unexpected dev1->dev2 unicast data length")
 
         if broadcast:
-            cmd = "DATA_TEST_TX ff:ff:ff:ff:ff:ff {} {}".format(addr1, tos)
-            if send_len is not None:
-                cmd += " len=" + str(send_len)
-            for i in range(broadcast_retry_c):
-                try:
-                    if dev1group:
-                        dev1.group_request(cmd)
-                    else:
-                        dev1.request(cmd)
-                    if dev2group:
-                        ev = dev2.wait_group_event(["DATA-TEST-RX"],
-                                                   timeout=timeout)
-                    else:
-                        ev = dev2.wait_event(["DATA-TEST-RX"], timeout=timeout)
-                    if ev is None:
-                        raise Exception("dev1->dev2 broadcast data delivery failed")
-                    if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr1) not in ev:
-                        raise Exception("Unexpected dev1->dev2 broadcast data result")
-                    if send_len is not None:
-                        if " len=" + str(send_len) not in ev:
-                            raise Exception("Unexpected dev1->dev2 broadcast data length")
-                    else:
-                        if " len=" in ev:
-                            raise Exception("Unexpected dev1->dev2 broadcast data length")
-                    break
-                except Exception as e:
-                    if i == broadcast_retry_c - 1:
-                        raise
+            run_multicast_connectivity_test(dev1, dev2, tos,
+                                            dev1group, dev2group,
+                                            ifname1, ifname2, False, timeout,
+                                            send_len, False, broadcast_retry_c)
 
         cmd = "DATA_TEST_TX {} {} {}".format(addr1, addr2, tos)
         if send_len is not None:
@@ -132,40 +149,12 @@ def run_connectivity_test(dev1, dev2, tos, dev1group=False, dev2group=False,
                 raise Exception("Unexpected dev2->dev1 unicast data length")
 
         if broadcast:
-            cmd = "DATA_TEST_TX ff:ff:ff:ff:ff:ff {} {}".format(addr2, tos)
-            if send_len is not None:
-                cmd += " len=" + str(send_len)
-            for i in range(broadcast_retry_c):
-                try:
-                    if dev2group:
-                        dev2.group_request(cmd)
-                    else:
-                        dev2.request(cmd)
-                    if dev1group:
-                        ev = dev1.wait_group_event(["DATA-TEST-RX"],
-                                                   timeout=timeout)
-                    else:
-                        ev = dev1.wait_event(["DATA-TEST-RX"], timeout=timeout)
-                    if ev is None:
-                        raise Exception("dev2->dev1 broadcast data delivery failed")
-                    if multicast_to_unicast:
-                        if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr2) in ev:
-                            raise Exception("Unexpected dev2->dev1 broadcast data result: multicast to unicast conversion missing")
-                        if "DATA-TEST-RX {} {}".format(addr1, addr2) not in ev:
-                            raise Exception("Unexpected dev2->dev1 broadcast data result (multicast to unicast enabled)")
-                    else:
-                        if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr2) not in ev:
-                            raise Exception("Unexpected dev2->dev1 broadcast data result")
-                    if send_len is not None:
-                        if " len=" + str(send_len) not in ev:
-                            raise Exception("Unexpected dev2->dev1 broadcast data length")
-                    else:
-                        if " len=" in ev:
-                            raise Exception("Unexpected dev2->dev1 broadcast data length")
-                    break
-                except Exception as e:
-                    if i == broadcast_retry_c - 1:
-                        raise
+            run_multicast_connectivity_test(dev2, dev1, tos,
+                                            dev2group, dev1group,
+                                            ifname2, ifname1, False, timeout,
+                                            send_len, multicast_to_unicast,
+                                            broadcast_retry_c)
+
     finally:
         if config:
             if dev1group:
index 1287df80a3cd597f5671c5a4f2822b8ad9cd818f..2283fa99cbecd46defd0be78fa0c341bd423d34f 100644 (file)
@@ -598,6 +598,15 @@ class WpaSupplicant:
             res = self.p2p_dev_addr()
         return res
 
+    def get_addr(self, group=False):
+        dev_addr = self.own_addr()
+        if not group:
+            addr = self.get_status_field('address')
+            if addr:
+                dev_addr = addr
+
+        return dev_addr
+
     def p2p_listen(self):
         return self.global_request("P2P_LISTEN")