import struct
import sys
from test_ap_hs20 import hs20_ap_params
+from test_nan_usd import check_nan_usd_capab, split_nan_event
try:
if sys.version_info[0] > 2:
with TestDbusANQPBSSPropertiesChanged(bus) as t:
if not t.success():
raise Exception("Expected signals not seen")
+
+def test_dbus_nan_usd_publish(dev, apdev):
+ """D-Bus NAN USD publish"""
+ check_nan_usd_capab(dev[0])
+ (bus, wpa_obj, path, if_obj) = prepare_dbus(dev[0])
+ iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
+
+ class TestDbusNANUSD(TestDbus):
+ def __init__(self, bus):
+ TestDbus.__init__(self, bus)
+ self.publish_terminated = False
+
+ def __enter__(self):
+ gobject.timeout_add(1, self.start_publish)
+ gobject.timeout_add(500, self.stop_publish)
+ gobject.timeout_add(15000, self.timeout)
+ self.add_signal(self.nanPublishTerminated, WPAS_DBUS_IFACE,
+ "NANPublishTerminated")
+ self.loop.run()
+ return self
+
+ def nanPublishTerminated(self, publish_id, reason):
+ logger.debug("nanPublishTerminated: %d %s" % (publish_id, reason))
+ if publish_id == self.publish_id:
+ self.publish_terminated = True
+
+ def start_publish(self, *args):
+ self.publish_id = iface.NANPublish({'srv_name': 'test service',
+ 'srv_proto_type': 2,
+ 'ssi': dbus.ByteArray(b'test')})
+ iface.NANUpdatePublish({'publish_id': self.publish_id,
+ 'ssi': dbus.ByteArray(b'new')})
+ return False
+
+ def stop_publish(self, *args):
+ iface.NANCancelPublish(self.publish_id)
+ return False
+
+ def success(self):
+ return self.publish_terminated
+
+ with TestDbusNANUSD(bus) as t:
+ if not t.success():
+ raise Exception("Expected signals not seen")
+
+def test_dbus_nan_usd_subscribe(dev, apdev):
+ """D-Bus NAN USD subscribe"""
+ check_nan_usd_capab(dev[0])
+ (bus, wpa_obj, path, if_obj) = prepare_dbus(dev[0])
+ iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
+
+ class TestDbusNANUSD(TestDbus):
+ def __init__(self, bus):
+ TestDbus.__init__(self, bus)
+ self.subscribe_terminated = False
+
+ def __enter__(self):
+ gobject.timeout_add(1, self.start_subscribe)
+ gobject.timeout_add(500, self.stop_subscribe)
+ gobject.timeout_add(15000, self.timeout)
+ self.add_signal(self.nanSubscribeTerminated, WPAS_DBUS_IFACE,
+ "NANSubscribeTerminated")
+ self.loop.run()
+ return self
+
+ def nanSubscribeTerminated(self, subscribe_id, reason):
+ logger.debug("nanSubscribeTerminated: %d %s" % (subscribe_id, reason))
+ if subscribe_id == self.subscribe_id:
+ self.subscribe_terminated = True
+ self.loop.quit()
+
+ def start_subscribe(self, *args):
+ self.subscribe_id = iface.NANSubscribe({'srv_name': 'test service',
+ 'srv_proto_type': 2,
+ 'ssi': dbus.ByteArray(b'test')})
+ return False
+
+ def stop_subscribe(self, *args):
+ iface.NANCancelSubscribe(self.subscribe_id)
+ return False
+
+ def success(self):
+ return self.subscribe_terminated
+
+ with TestDbusNANUSD(bus) as t:
+ if not t.success():
+ raise Exception("Expected signals not seen")
+
+def test_dbus_nan_usd_subscribe_followup(dev, apdev):
+ """D-Bus NAN USD subscribe and followup"""
+ check_nan_usd_capab(dev[0])
+ check_nan_usd_capab(dev[1])
+ (bus, wpa_obj, path, if_obj) = prepare_dbus(dev[0])
+ iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
+
+ class TestDbusNANUSD(TestDbus):
+ def __init__(self, bus):
+ TestDbus.__init__(self, bus)
+ self.subscribe_terminated = False
+ self.discovered = False
+ self.followup = False
+
+ def __enter__(self):
+ gobject.timeout_add(1, self.start_subscribe)
+ gobject.timeout_add(15000, self.timeout)
+ self.add_signal(self.nanDiscoveryResult, WPAS_DBUS_IFACE,
+ "NANDiscoveryResult")
+ self.add_signal(self.nanSubscribeTerminated, WPAS_DBUS_IFACE,
+ "NANSubscribeTerminated")
+ self.loop.run()
+ return self
+
+ def nanDiscoveryResult(self, args):
+ logger.debug("nanDiscoveryResult: %s" % str(args))
+ ssi = args['ssi']
+ publish_id = args['publish_id']
+ subscribe_id = args['subscribe_id']
+ peer_addr = args['peer_addr']
+ if publish_id == self.id1 and \
+ subscribe_id == self.subscribe_id and \
+ args['srv_proto_type'] == 3 and \
+ peer_addr == dev[1].own_addr() and \
+ len(ssi) == 2 and ssi[0] == 0x66 and ssi[1] == 0x77:
+ self.discovered = True
+ ev = dev[1].wait_event(["NAN-RECEIVE"], timeout=5)
+ if ev is None:
+ raise Exception("Automatically sent Follow-up message without ssi not seen")
+ iface.NANTransmit({'handle': subscribe_id,
+ 'req_instance_id': publish_id,
+ 'peer_addr': peer_addr,
+ 'ssi': dbus.ByteArray(b'followup')})
+ ev = dev[1].wait_event(["NAN-RECEIVE"], timeout=5)
+ if ev is None:
+ raise Exception("Follow-up message not seen")
+ if "ssi=666f6c6c6f777570" not in ev.split(' '):
+ raise Exception("Expected SSI not seen in Follow-up")
+ self.followup = True
+ else:
+ logger.info("nanDiscoveryResult values did not match")
+
+ def nanSubscribeTerminated(self, subscribe_id, reason):
+ logger.debug("nanSubscribeTerminated: %d %s" % (subscribe_id, reason))
+ if subscribe_id == self.subscribe_id:
+ self.subscribe_terminated = True
+ self.loop.quit()
+
+ def start_subscribe(self, *args):
+ self.subscribe_id = iface.NANSubscribe({'srv_name': '_test',
+ 'srv_proto_type': 3,
+ 'ssi': dbus.ByteArray(b'test')})
+
+ cmd = "NAN_PUBLISH service_name=_test srv_proto_type=3 ssi=6677 ttl=10"
+ self.id1 = dev[1].request(cmd)
+ if "FAIL" in self.id1:
+ raise Exception("NAN_PUBLISH failed")
+ self.id1 = int(self.id1)
+ return False
+
+ def success(self):
+ return self.subscribe_terminated and self.discovered and \
+ self.followup
+
+ with TestDbusNANUSD(bus) as t:
+ if not t.success():
+ raise Exception("Expected signals not seen")
+
+def test_dbus_nan_usd_publish_followup(dev, apdev):
+ """D-Bus NAN USD publish and followup"""
+ check_nan_usd_capab(dev[0])
+ (bus, wpa_obj, path, if_obj) = prepare_dbus(dev[0])
+ iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
+
+ class TestDbusNANUSD(TestDbus):
+ def __init__(self, bus):
+ TestDbus.__init__(self, bus)
+ self.publish_terminated = False
+ self.receive = False
+ self.first_receive = True
+ self.followup = False
+
+ def __enter__(self):
+ gobject.timeout_add(1, self.start_publish)
+ gobject.timeout_add(15000, self.timeout)
+ self.add_signal(self.nanPublishTerminated, WPAS_DBUS_IFACE,
+ "NANPublishTerminated")
+ self.add_signal(self.nanReceive, WPAS_DBUS_IFACE,
+ "NANReceive")
+ self.loop.run()
+ return self
+
+ def nanPublishTerminated(self, publish_id, reason):
+ logger.debug("nanPublishTerminated: %d %s" % (publish_id, reason))
+ if publish_id == self.publish_id:
+ self.publish_terminated = True
+ self.loop.quit()
+
+ def nanReceive(self, args):
+ logger.debug("nanReceive: %s" % str(args))
+ self.receive = True
+ if self.first_receive:
+ self.first_receive = False
+ return
+ ssi = args['ssi']
+ if len(ssi) == 2 and ssi[0] == 0x88 and ssi[1] == 0x99:
+ self.followup = True
+ iface.NANTransmit({'handle': args['id'],
+ 'req_instance_id': args['peer_id'],
+ 'peer_addr': args['peer_addr'],
+ 'ssi': dbus.ByteArray(b'followup')})
+
+ def start_publish(self, *args):
+ cmd = "NAN_SUBSCRIBE service_name=_test srv_proto_type=3 ssi=1122334455"
+ id1 = dev[1].request(cmd)
+ if "FAIL" in id1:
+ raise Exception("NAN_SUBSCRIBE failed")
+
+ self.publish_id = iface.NANPublish({'srv_name': '_test',
+ 'srv_proto_type': 2,
+ 'ssi': dbus.ByteArray(b'test')})
+
+ ev = dev[1].wait_event(["NAN-DISCOVERY-RESULT"], timeout=10)
+ if ev is None:
+ raise Exception("DiscoveryResult event not seen")
+
+ vals = split_nan_event(ev)
+ cmd = "NAN_TRANSMIT handle={} req_instance_id={} address={} ssi=8899".format(vals['subscribe_id'], vals['publish_id'], vals['address'])
+ if "FAIL" in dev[1].request(cmd):
+ raise Exception("NAN_TRANSMIT failed")
+
+ return False
+
+ def success(self):
+ return self.publish_terminated and self.receive and self.followup
+
+ with TestDbusNANUSD(bus) as t:
+ if not t.success():
+ raise Exception("Expected signals not seen")