]> git.ipfire.org Git - thirdparty/hostap.git/commitdiff
tests: D-Bus interface for NAN USD
authorJouni Malinen <quic_jouni@quicinc.com>
Sat, 14 Sep 2024 15:24:24 +0000 (18:24 +0300)
committerJouni Malinen <j@w1.fi>
Sun, 15 Sep 2024 09:33:50 +0000 (12:33 +0300)
Signed-off-by: Jouni Malinen <quic_jouni@quicinc.com>
tests/hwsim/test_dbus.py

index 454c1c86fd00d6dab69993026e0925bd742d7823..a46b27f9d349d2c1883310e90c6a2cc02fe248b1 100644 (file)
@@ -13,6 +13,7 @@ import shutil
 import struct
 import sys
 from test_ap_hs20 import hs20_ap_params
+from test_nan_usd import check_nan_usd_capab, split_nan_event
 
 try:
     if sys.version_info[0] > 2:
@@ -6424,3 +6425,240 @@ def test_dbus_bss_anqp_properties(dev, apdev):
     with TestDbusANQPBSSPropertiesChanged(bus) as t:
         if not t.success():
             raise Exception("Expected signals not seen")
+
+def test_dbus_nan_usd_publish(dev, apdev):
+    """D-Bus NAN USD publish"""
+    check_nan_usd_capab(dev[0])
+    (bus, wpa_obj, path, if_obj) = prepare_dbus(dev[0])
+    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
+
+    class TestDbusNANUSD(TestDbus):
+        def __init__(self, bus):
+            TestDbus.__init__(self, bus)
+            self.publish_terminated = False
+
+        def __enter__(self):
+            gobject.timeout_add(1, self.start_publish)
+            gobject.timeout_add(500, self.stop_publish)
+            gobject.timeout_add(15000, self.timeout)
+            self.add_signal(self.nanPublishTerminated, WPAS_DBUS_IFACE,
+                            "NANPublishTerminated")
+            self.loop.run()
+            return self
+
+        def nanPublishTerminated(self, publish_id, reason):
+            logger.debug("nanPublishTerminated: %d %s" % (publish_id, reason))
+            if publish_id == self.publish_id:
+                self.publish_terminated = True
+
+        def start_publish(self, *args):
+            self.publish_id = iface.NANPublish({'srv_name': 'test service',
+                                                'srv_proto_type': 2,
+                                                'ssi': dbus.ByteArray(b'test')})
+            iface.NANUpdatePublish({'publish_id': self.publish_id,
+                                    'ssi': dbus.ByteArray(b'new')})
+            return False
+
+        def stop_publish(self, *args):
+            iface.NANCancelPublish(self.publish_id)
+            return False
+
+        def success(self):
+            return self.publish_terminated
+
+    with TestDbusNANUSD(bus) as t:
+        if not t.success():
+            raise Exception("Expected signals not seen")
+
+def test_dbus_nan_usd_subscribe(dev, apdev):
+    """D-Bus NAN USD subscribe"""
+    check_nan_usd_capab(dev[0])
+    (bus, wpa_obj, path, if_obj) = prepare_dbus(dev[0])
+    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
+
+    class TestDbusNANUSD(TestDbus):
+        def __init__(self, bus):
+            TestDbus.__init__(self, bus)
+            self.subscribe_terminated = False
+
+        def __enter__(self):
+            gobject.timeout_add(1, self.start_subscribe)
+            gobject.timeout_add(500, self.stop_subscribe)
+            gobject.timeout_add(15000, self.timeout)
+            self.add_signal(self.nanSubscribeTerminated, WPAS_DBUS_IFACE,
+                            "NANSubscribeTerminated")
+            self.loop.run()
+            return self
+
+        def nanSubscribeTerminated(self, subscribe_id, reason):
+            logger.debug("nanSubscribeTerminated: %d %s" % (subscribe_id, reason))
+            if subscribe_id == self.subscribe_id:
+                self.subscribe_terminated = True
+                self.loop.quit()
+
+        def start_subscribe(self, *args):
+            self.subscribe_id = iface.NANSubscribe({'srv_name': 'test service',
+                                                    'srv_proto_type': 2,
+                                                    'ssi': dbus.ByteArray(b'test')})
+            return False
+
+        def stop_subscribe(self, *args):
+            iface.NANCancelSubscribe(self.subscribe_id)
+            return False
+
+        def success(self):
+            return self.subscribe_terminated
+
+    with TestDbusNANUSD(bus) as t:
+        if not t.success():
+            raise Exception("Expected signals not seen")
+
+def test_dbus_nan_usd_subscribe_followup(dev, apdev):
+    """D-Bus NAN USD subscribe and followup"""
+    check_nan_usd_capab(dev[0])
+    check_nan_usd_capab(dev[1])
+    (bus, wpa_obj, path, if_obj) = prepare_dbus(dev[0])
+    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
+
+    class TestDbusNANUSD(TestDbus):
+        def __init__(self, bus):
+            TestDbus.__init__(self, bus)
+            self.subscribe_terminated = False
+            self.discovered = False
+            self.followup = False
+
+        def __enter__(self):
+            gobject.timeout_add(1, self.start_subscribe)
+            gobject.timeout_add(15000, self.timeout)
+            self.add_signal(self.nanDiscoveryResult, WPAS_DBUS_IFACE,
+                            "NANDiscoveryResult")
+            self.add_signal(self.nanSubscribeTerminated, WPAS_DBUS_IFACE,
+                            "NANSubscribeTerminated")
+            self.loop.run()
+            return self
+
+        def nanDiscoveryResult(self, args):
+            logger.debug("nanDiscoveryResult: %s" % str(args))
+            ssi = args['ssi']
+            publish_id = args['publish_id']
+            subscribe_id = args['subscribe_id']
+            peer_addr = args['peer_addr']
+            if publish_id == self.id1 and \
+               subscribe_id == self.subscribe_id and \
+               args['srv_proto_type'] == 3 and \
+               peer_addr == dev[1].own_addr() and \
+               len(ssi) == 2 and ssi[0] == 0x66 and ssi[1] == 0x77:
+                self.discovered = True
+                ev = dev[1].wait_event(["NAN-RECEIVE"], timeout=5)
+                if ev is None:
+                    raise Exception("Automatically sent Follow-up message without ssi not seen")
+                iface.NANTransmit({'handle': subscribe_id,
+                                   'req_instance_id': publish_id,
+                                   'peer_addr': peer_addr,
+                                   'ssi': dbus.ByteArray(b'followup')})
+                ev = dev[1].wait_event(["NAN-RECEIVE"], timeout=5)
+                if ev is None:
+                    raise Exception("Follow-up message not seen")
+                if "ssi=666f6c6c6f777570" not in ev.split(' '):
+                    raise Exception("Expected SSI not seen in Follow-up")
+                self.followup = True
+            else:
+                logger.info("nanDiscoveryResult values did not match")
+
+        def nanSubscribeTerminated(self, subscribe_id, reason):
+            logger.debug("nanSubscribeTerminated: %d %s" % (subscribe_id, reason))
+            if subscribe_id == self.subscribe_id:
+                self.subscribe_terminated = True
+                self.loop.quit()
+
+        def start_subscribe(self, *args):
+            self.subscribe_id = iface.NANSubscribe({'srv_name': '_test',
+                                                    'srv_proto_type': 3,
+                                                    'ssi': dbus.ByteArray(b'test')})
+
+            cmd = "NAN_PUBLISH service_name=_test srv_proto_type=3 ssi=6677 ttl=10"
+            self.id1 = dev[1].request(cmd)
+            if "FAIL" in self.id1:
+                raise Exception("NAN_PUBLISH failed")
+            self.id1 = int(self.id1)
+            return False
+
+        def success(self):
+            return self.subscribe_terminated and self.discovered and \
+                self.followup
+
+    with TestDbusNANUSD(bus) as t:
+        if not t.success():
+            raise Exception("Expected signals not seen")
+
+def test_dbus_nan_usd_publish_followup(dev, apdev):
+    """D-Bus NAN USD publish and followup"""
+    check_nan_usd_capab(dev[0])
+    (bus, wpa_obj, path, if_obj) = prepare_dbus(dev[0])
+    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
+
+    class TestDbusNANUSD(TestDbus):
+        def __init__(self, bus):
+            TestDbus.__init__(self, bus)
+            self.publish_terminated = False
+            self.receive = False
+            self.first_receive = True
+            self.followup = False
+
+        def __enter__(self):
+            gobject.timeout_add(1, self.start_publish)
+            gobject.timeout_add(15000, self.timeout)
+            self.add_signal(self.nanPublishTerminated, WPAS_DBUS_IFACE,
+                            "NANPublishTerminated")
+            self.add_signal(self.nanReceive, WPAS_DBUS_IFACE,
+                            "NANReceive")
+            self.loop.run()
+            return self
+
+        def nanPublishTerminated(self, publish_id, reason):
+            logger.debug("nanPublishTerminated: %d %s" % (publish_id, reason))
+            if publish_id == self.publish_id:
+                self.publish_terminated = True
+                self.loop.quit()
+
+        def nanReceive(self, args):
+            logger.debug("nanReceive: %s" % str(args))
+            self.receive = True
+            if self.first_receive:
+                self.first_receive = False
+                return
+            ssi = args['ssi']
+            if len(ssi) == 2 and ssi[0] == 0x88 and ssi[1] == 0x99:
+                self.followup = True
+                iface.NANTransmit({'handle': args['id'],
+                                   'req_instance_id': args['peer_id'],
+                                   'peer_addr': args['peer_addr'],
+                                   'ssi': dbus.ByteArray(b'followup')})
+
+        def start_publish(self, *args):
+            cmd = "NAN_SUBSCRIBE service_name=_test srv_proto_type=3 ssi=1122334455"
+            id1 = dev[1].request(cmd)
+            if "FAIL" in id1:
+                raise Exception("NAN_SUBSCRIBE failed")
+
+            self.publish_id = iface.NANPublish({'srv_name': '_test',
+                                                'srv_proto_type': 2,
+                                                'ssi': dbus.ByteArray(b'test')})
+
+            ev = dev[1].wait_event(["NAN-DISCOVERY-RESULT"], timeout=10)
+            if ev is None:
+                raise Exception("DiscoveryResult event not seen")
+
+            vals = split_nan_event(ev)
+            cmd = "NAN_TRANSMIT handle={} req_instance_id={} address={} ssi=8899".format(vals['subscribe_id'], vals['publish_id'], vals['address'])
+            if "FAIL" in dev[1].request(cmd):
+                raise Exception("NAN_TRANSMIT failed")
+
+            return False
+
+        def success(self):
+            return self.publish_terminated and self.receive and self.followup
+
+    with TestDbusNANUSD(bus) as t:
+        if not t.success():
+            raise Exception("Expected signals not seen")