From 5ace39b0a4cdbe18ddbc4e18f80ee3876233c20b Mon Sep 17 00:00:00 2001 From: Jouni Malinen Date: Sat, 14 Sep 2024 18:24:24 +0300 Subject: [PATCH] tests: D-Bus interface for NAN USD Signed-off-by: Jouni Malinen --- tests/hwsim/test_dbus.py | 238 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 238 insertions(+) diff --git a/tests/hwsim/test_dbus.py b/tests/hwsim/test_dbus.py index 454c1c86f..a46b27f9d 100644 --- a/tests/hwsim/test_dbus.py +++ b/tests/hwsim/test_dbus.py @@ -13,6 +13,7 @@ import shutil import struct import sys from test_ap_hs20 import hs20_ap_params +from test_nan_usd import check_nan_usd_capab, split_nan_event try: if sys.version_info[0] > 2: @@ -6424,3 +6425,240 @@ def test_dbus_bss_anqp_properties(dev, apdev): with TestDbusANQPBSSPropertiesChanged(bus) as t: if not t.success(): raise Exception("Expected signals not seen") + +def test_dbus_nan_usd_publish(dev, apdev): + """D-Bus NAN USD publish""" + check_nan_usd_capab(dev[0]) + (bus, wpa_obj, path, if_obj) = prepare_dbus(dev[0]) + iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) + + class TestDbusNANUSD(TestDbus): + def __init__(self, bus): + TestDbus.__init__(self, bus) + self.publish_terminated = False + + def __enter__(self): + gobject.timeout_add(1, self.start_publish) + gobject.timeout_add(500, self.stop_publish) + gobject.timeout_add(15000, self.timeout) + self.add_signal(self.nanPublishTerminated, WPAS_DBUS_IFACE, + "NANPublishTerminated") + self.loop.run() + return self + + def nanPublishTerminated(self, publish_id, reason): + logger.debug("nanPublishTerminated: %d %s" % (publish_id, reason)) + if publish_id == self.publish_id: + self.publish_terminated = True + + def start_publish(self, *args): + self.publish_id = iface.NANPublish({'srv_name': 'test service', + 'srv_proto_type': 2, + 'ssi': dbus.ByteArray(b'test')}) + iface.NANUpdatePublish({'publish_id': self.publish_id, + 'ssi': dbus.ByteArray(b'new')}) + return False + + def stop_publish(self, *args): + iface.NANCancelPublish(self.publish_id) + return False + + def success(self): + return self.publish_terminated + + with TestDbusNANUSD(bus) as t: + if not t.success(): + raise Exception("Expected signals not seen") + +def test_dbus_nan_usd_subscribe(dev, apdev): + """D-Bus NAN USD subscribe""" + check_nan_usd_capab(dev[0]) + (bus, wpa_obj, path, if_obj) = prepare_dbus(dev[0]) + iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) + + class TestDbusNANUSD(TestDbus): + def __init__(self, bus): + TestDbus.__init__(self, bus) + self.subscribe_terminated = False + + def __enter__(self): + gobject.timeout_add(1, self.start_subscribe) + gobject.timeout_add(500, self.stop_subscribe) + gobject.timeout_add(15000, self.timeout) + self.add_signal(self.nanSubscribeTerminated, WPAS_DBUS_IFACE, + "NANSubscribeTerminated") + self.loop.run() + return self + + def nanSubscribeTerminated(self, subscribe_id, reason): + logger.debug("nanSubscribeTerminated: %d %s" % (subscribe_id, reason)) + if subscribe_id == self.subscribe_id: + self.subscribe_terminated = True + self.loop.quit() + + def start_subscribe(self, *args): + self.subscribe_id = iface.NANSubscribe({'srv_name': 'test service', + 'srv_proto_type': 2, + 'ssi': dbus.ByteArray(b'test')}) + return False + + def stop_subscribe(self, *args): + iface.NANCancelSubscribe(self.subscribe_id) + return False + + def success(self): + return self.subscribe_terminated + + with TestDbusNANUSD(bus) as t: + if not t.success(): + raise Exception("Expected signals not seen") + +def test_dbus_nan_usd_subscribe_followup(dev, apdev): + """D-Bus NAN USD subscribe and followup""" + check_nan_usd_capab(dev[0]) + check_nan_usd_capab(dev[1]) + (bus, wpa_obj, path, if_obj) = prepare_dbus(dev[0]) + iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) + + class TestDbusNANUSD(TestDbus): + def __init__(self, bus): + TestDbus.__init__(self, bus) + self.subscribe_terminated = False + self.discovered = False + self.followup = False + + def __enter__(self): + gobject.timeout_add(1, self.start_subscribe) + gobject.timeout_add(15000, self.timeout) + self.add_signal(self.nanDiscoveryResult, WPAS_DBUS_IFACE, + "NANDiscoveryResult") + self.add_signal(self.nanSubscribeTerminated, WPAS_DBUS_IFACE, + "NANSubscribeTerminated") + self.loop.run() + return self + + def nanDiscoveryResult(self, args): + logger.debug("nanDiscoveryResult: %s" % str(args)) + ssi = args['ssi'] + publish_id = args['publish_id'] + subscribe_id = args['subscribe_id'] + peer_addr = args['peer_addr'] + if publish_id == self.id1 and \ + subscribe_id == self.subscribe_id and \ + args['srv_proto_type'] == 3 and \ + peer_addr == dev[1].own_addr() and \ + len(ssi) == 2 and ssi[0] == 0x66 and ssi[1] == 0x77: + self.discovered = True + ev = dev[1].wait_event(["NAN-RECEIVE"], timeout=5) + if ev is None: + raise Exception("Automatically sent Follow-up message without ssi not seen") + iface.NANTransmit({'handle': subscribe_id, + 'req_instance_id': publish_id, + 'peer_addr': peer_addr, + 'ssi': dbus.ByteArray(b'followup')}) + ev = dev[1].wait_event(["NAN-RECEIVE"], timeout=5) + if ev is None: + raise Exception("Follow-up message not seen") + if "ssi=666f6c6c6f777570" not in ev.split(' '): + raise Exception("Expected SSI not seen in Follow-up") + self.followup = True + else: + logger.info("nanDiscoveryResult values did not match") + + def nanSubscribeTerminated(self, subscribe_id, reason): + logger.debug("nanSubscribeTerminated: %d %s" % (subscribe_id, reason)) + if subscribe_id == self.subscribe_id: + self.subscribe_terminated = True + self.loop.quit() + + def start_subscribe(self, *args): + self.subscribe_id = iface.NANSubscribe({'srv_name': '_test', + 'srv_proto_type': 3, + 'ssi': dbus.ByteArray(b'test')}) + + cmd = "NAN_PUBLISH service_name=_test srv_proto_type=3 ssi=6677 ttl=10" + self.id1 = dev[1].request(cmd) + if "FAIL" in self.id1: + raise Exception("NAN_PUBLISH failed") + self.id1 = int(self.id1) + return False + + def success(self): + return self.subscribe_terminated and self.discovered and \ + self.followup + + with TestDbusNANUSD(bus) as t: + if not t.success(): + raise Exception("Expected signals not seen") + +def test_dbus_nan_usd_publish_followup(dev, apdev): + """D-Bus NAN USD publish and followup""" + check_nan_usd_capab(dev[0]) + (bus, wpa_obj, path, if_obj) = prepare_dbus(dev[0]) + iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) + + class TestDbusNANUSD(TestDbus): + def __init__(self, bus): + TestDbus.__init__(self, bus) + self.publish_terminated = False + self.receive = False + self.first_receive = True + self.followup = False + + def __enter__(self): + gobject.timeout_add(1, self.start_publish) + gobject.timeout_add(15000, self.timeout) + self.add_signal(self.nanPublishTerminated, WPAS_DBUS_IFACE, + "NANPublishTerminated") + self.add_signal(self.nanReceive, WPAS_DBUS_IFACE, + "NANReceive") + self.loop.run() + return self + + def nanPublishTerminated(self, publish_id, reason): + logger.debug("nanPublishTerminated: %d %s" % (publish_id, reason)) + if publish_id == self.publish_id: + self.publish_terminated = True + self.loop.quit() + + def nanReceive(self, args): + logger.debug("nanReceive: %s" % str(args)) + self.receive = True + if self.first_receive: + self.first_receive = False + return + ssi = args['ssi'] + if len(ssi) == 2 and ssi[0] == 0x88 and ssi[1] == 0x99: + self.followup = True + iface.NANTransmit({'handle': args['id'], + 'req_instance_id': args['peer_id'], + 'peer_addr': args['peer_addr'], + 'ssi': dbus.ByteArray(b'followup')}) + + def start_publish(self, *args): + cmd = "NAN_SUBSCRIBE service_name=_test srv_proto_type=3 ssi=1122334455" + id1 = dev[1].request(cmd) + if "FAIL" in id1: + raise Exception("NAN_SUBSCRIBE failed") + + self.publish_id = iface.NANPublish({'srv_name': '_test', + 'srv_proto_type': 2, + 'ssi': dbus.ByteArray(b'test')}) + + ev = dev[1].wait_event(["NAN-DISCOVERY-RESULT"], timeout=10) + if ev is None: + raise Exception("DiscoveryResult event not seen") + + vals = split_nan_event(ev) + cmd = "NAN_TRANSMIT handle={} req_instance_id={} address={} ssi=8899".format(vals['subscribe_id'], vals['publish_id'], vals['address']) + if "FAIL" in dev[1].request(cmd): + raise Exception("NAN_TRANSMIT failed") + + return False + + def success(self): + return self.publish_terminated and self.receive and self.followup + + with TestDbusNANUSD(bus) as t: + if not t.success(): + raise Exception("Expected signals not seen") -- 2.47.2