ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"])
if ev is None:
- raise Exception("regdom change event not seen")
+ # For now, work around separate P2P Device interface event delivery
+ ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
+ if ev is None:
+ raise Exception("regdom change event not seen")
if "init=USER type=COUNTRY alpha2=FI" not in ev:
raise Exception("Unexpected event contents: " + ev)
ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"])
if ev is None:
- raise Exception("regdom change event not seen")
+ # For now, work around separate P2P Device interface event delivery
+ ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
+ if ev is None:
+ raise Exception("regdom change event not seen")
if "init=CORE type=WORLD" not in ev:
raise Exception("Unexpected event contents: " + ev)
def test_dbus_probe_req_reporting(dev, apdev):
"""D-Bus Probe Request reporting"""
(bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
- iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
- dev[0].p2p_start_go(freq=2412)
dev[1].p2p_find(social=True)
class TestDbusProbe(TestDbus):
def __enter__(self):
gobject.timeout_add(1, self.run_test)
gobject.timeout_add(15000, self.timeout)
+ self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
+ "GroupStarted")
self.add_signal(self.probeRequest, WPAS_DBUS_IFACE, "ProbeRequest",
byte_arrays=True)
- iface.SubscribeProbeReq()
self.loop.run()
return self
+ def groupStarted(self, properties):
+ logger.debug("groupStarted: " + str(properties))
+ g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
+ properties['interface_object'])
+ self.iface = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE)
+ self.iface.SubscribeProbeReq()
+ self.group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
+
def probeRequest(self, args):
logger.debug("probeRequest: args=%s" % str(args))
self.reported = True
def run_test(self, *args):
logger.debug("run_test")
+ p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
+ params = dbus.Dictionary({ 'frequency': 2412 })
+ p2p.GroupAdd(params)
return False
def success(self):
with TestDbusProbe(bus) as t:
if not t.success():
raise Exception("Expected signals not seen")
- iface.UnsubscribeProbeReq()
-
- try:
- iface.UnsubscribeProbeReq()
- raise Exception("Invalid UnsubscribeProbeReq() accepted")
- except dbus.exceptions.DBusException, e:
- if "NoSubscription" not in str(e):
- raise Exception("Unexpected error message for invalid UnsubscribeProbeReq(): " + str(e))
+ t.iface.UnsubscribeProbeReq()
+ try:
+ t.iface.UnsubscribeProbeReq()
+ raise Exception("Invalid UnsubscribeProbeReq() accepted")
+ except dbus.exceptions.DBusException, e:
+ if "NoSubscription" not in str(e):
+ raise Exception("Unexpected error message for invalid UnsubscribeProbeReq(): " + str(e))
+ t.group_p2p.Disconnect()
with TestDbusProbe(bus) as t:
if not t.success():
# cleanup.
dev[1].p2p_stop_find()
- dev[0].remove_group()
def test_dbus_probe_req_reporting_oom(dev, apdev):
"""D-Bus Probe Request reporting (OOM)"""
"""D-Bus P2P autonomous GO"""
(bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
- wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
addr0 = dev[0].p2p_dev_addr()
def groupStarted(self, properties):
logger.debug("groupStarted: " + str(properties))
self.group = properties['group_object']
- role = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Role",
- dbus_interface=dbus.PROPERTIES_IFACE)
+ self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
+ properties['interface_object'])
+ role = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Role",
+ dbus_interface=dbus.PROPERTIES_IFACE)
if role != "GO":
raise Exception("Unexpected role reported: " + role)
- group = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Group",
- dbus_interface=dbus.PROPERTIES_IFACE)
+ group = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Group",
+ dbus_interface=dbus.PROPERTIES_IFACE)
if group != properties['group_object']:
raise Exception("Unexpected Group reported: " + str(group))
- go = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PeerGO",
- dbus_interface=dbus.PROPERTIES_IFACE)
+ go = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PeerGO",
+ dbus_interface=dbus.PROPERTIES_IFACE)
if go != '/':
raise Exception("Unexpected PeerGO value: " + str(go))
if self.first:
self.first = False
logger.info("Remove persistent group instance")
- p2p.Disconnect()
+ group_p2p = dbus.Interface(self.g_if_obj,
+ WPAS_DBUS_IFACE_P2PDEVICE)
+ group_p2p.Disconnect()
else:
dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 join")
'P2PDeviceAddress': self.peer['DeviceAddress'],
'Bssid': self.peer['DeviceAddress'],
'Type': 'pin' }
+ wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS)
try:
wps.Start(params)
raise Exception("Invalid WPS.Start() accepted")
raise Exception("Unexpected error message: " + str(e))
params = { 'Role': 'registrar',
'P2PDeviceAddress': self.peer['DeviceAddress'],
- 'Bssid': self.peer['DeviceAddress'],
'Type': 'pin',
'Pin': '12345670' }
logger.info("Authorize peer to connect to the group")
raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
self.waiting_end = True
- p2p.Disconnect()
+ group_p2p = dbus.Interface(self.g_if_obj,
+ WPAS_DBUS_IFACE_P2PDEVICE)
+ group_p2p.Disconnect()
def run_test(self, *args):
logger.debug("run_test")
"""D-Bus P2P autonomous GO and PBC"""
(bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
- wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
addr0 = dev[0].p2p_dev_addr()
def groupStarted(self, properties):
logger.debug("groupStarted: " + str(properties))
self.group = properties['group_object']
+ self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
+ properties['interface_object'])
dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
dev1.global_request("P2P_CONNECT " + addr0 + " pbc join")
addr += '%02x' % ord(p)
params = { 'Role': 'registrar',
'P2PDeviceAddress': self.peer['DeviceAddress'],
- 'Bssid': self.peer['DeviceAddress'],
'Type': 'pbc' }
logger.info("Authorize peer to connect to the group")
+ wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS)
wps.Start(params)
def staAuthorized(self, name):
logger.debug("staAuthorized: " + name)
- p2p.Disconnect()
+ group_p2p = dbus.Interface(self.g_if_obj,
+ WPAS_DBUS_IFACE_P2PDEVICE)
+ group_p2p.Disconnect()
def run_test(self, *args):
logger.debug("run_test")
"""D-Bus P2P autonomous GO and legacy STA"""
(bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
- wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
addr0 = dev[0].p2p_dev_addr()
def groupStarted(self, properties):
logger.debug("groupStarted: " + str(properties))
+ g_obj = bus.get_object(WPAS_DBUS_SERVICE,
+ properties['group_object'])
+ res = g_obj.GetAll(WPAS_DBUS_GROUP,
+ dbus_interface=dbus.PROPERTIES_IFACE,
+ byte_arrays=True)
+ bssid = ':'.join([binascii.hexlify(l) for l in res['BSSID']])
+
pin = '12345670'
params = { 'Role': 'enrollee',
'Type': 'pin',
'Pin': pin }
+ g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
+ properties['interface_object'])
+ wps = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_WPS)
wps.Start(params)
dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
- dev1.scan_for_bss(addr0, freq=2412)
- dev1.request("WPS_PIN " + addr0 + " " + pin)
+ dev1.scan_for_bss(bssid, freq=2412)
+ dev1.request("WPS_PIN " + bssid + " " + pin)
+ self.group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
def groupFinished(self, properties):
logger.debug("groupFinished: " + str(properties))
logger.debug("staAuthorized: " + name)
dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
dev1.request("DISCONNECT")
- p2p.Disconnect()
+ self.group_p2p.Disconnect()
def run_test(self, *args):
logger.debug("run_test")
addr1 = dev[1].p2p_dev_addr()
addr2 = dev[2].p2p_dev_addr()
dev[1].p2p_start_go(freq=2412)
+ dev1_group_ifname = dev[1].group_ifname
dev[2].p2p_listen()
class TestDbusP2p(TestDbus):
pin = p2p.Connect(args)
dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
- dev1.request("WPS_PIN any " + pin)
+ dev1.group_ifname = dev1_group_ifname
+ dev1.group_request("WPS_PIN any " + pin)
def groupStarted(self, properties):
logger.debug("groupStarted: " + str(properties))
- role = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Role",
- dbus_interface=dbus.PROPERTIES_IFACE)
+ g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
+ properties['interface_object'])
+ role = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Role",
+ dbus_interface=dbus.PROPERTIES_IFACE)
if role != "client":
raise Exception("Unexpected role reported: " + role)
- group = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Group",
- dbus_interface=dbus.PROPERTIES_IFACE)
+ group = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Group",
+ dbus_interface=dbus.PROPERTIES_IFACE)
if group != properties['group_object']:
raise Exception("Unexpected Group reported: " + str(group))
- go = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PeerGO",
- dbus_interface=dbus.PROPERTIES_IFACE)
+ go = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PeerGO",
+ dbus_interface=dbus.PROPERTIES_IFACE)
if go != self.go:
raise Exception("Unexpected PeerGO value: " + str(go))
if "Error.Failed: Failed to set property" not in str(e):
raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
+ group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
args = { 'duration1': 30000, 'interval1': 102400,
'duration2': 20000, 'interval2': 102400 }
- p2p.PresenceRequest(args)
+ group_p2p.PresenceRequest(args)
args = { 'peer': self.peer }
- p2p.Invite(args)
+ group_p2p.Invite(args)
def groupFinished(self, properties):
logger.debug("groupFinished: " + str(properties))
if result['status'] != 1:
raise Exception("Unexpected invitation result: " + str(result))
dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
+ dev1.group_ifname = dev1_group_ifname
dev1.remove_group()
def run_test(self, *args):
def groupStarted(self, properties):
logger.debug("groupStarted: " + str(properties))
- p2p.Disconnect()
+ g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
+ properties['interface_object'])
+ group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
+ group_p2p.Disconnect()
def groupFinished(self, properties):
logger.debug("groupFinished: " + str(properties))
"""D-Bus P2P reinvoke persistent group"""
(bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
- wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
addr0 = dev[0].p2p_dev_addr()
def groupStarted(self, properties):
logger.debug("groupStarted: " + str(properties))
+ self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
+ properties['interface_object'])
if not self.invited:
+ g_obj = bus.get_object(WPAS_DBUS_SERVICE,
+ properties['group_object'])
+ res = g_obj.GetAll(WPAS_DBUS_GROUP,
+ dbus_interface=dbus.PROPERTIES_IFACE,
+ byte_arrays=True)
+ bssid = ':'.join([binascii.hexlify(l) for l in res['BSSID']])
dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
- dev1.scan_for_bss(addr0, freq=2412)
+ dev1.scan_for_bss(bssid, freq=2412)
dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 join")
def groupFinished(self, properties):
self.loop.quit()
else:
dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
- dev1.request("SET persistent_reconnect 1")
+ dev1.global_request("SET persistent_reconnect 1")
dev1.p2p_listen()
args = { 'persistent_group_object': dbus.ObjectPath(path),
pin = p2p.Invite(args)
self.invited = True
+ self.sta_group_ev = dev1.wait_global_event(["P2P-GROUP-STARTED"],
+ timeout=15)
+ if self.sta_group_ev is None:
+ raise Exception("P2P-GROUP-STARTED event not seen")
+
def persistentGroupAdded(self, path, properties):
logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
self.persistent = path
'Type': 'pin',
'Pin': '12345670' }
logger.info("Authorize peer to connect to the group")
+ dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
+ wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS)
wps.Start(params)
+ self.sta_group_ev = dev1.wait_global_event(["P2P-GROUP-STARTED"],
+ timeout=15)
+ if self.sta_group_ev is None:
+ raise Exception("P2P-GROUP-STARTED event not seen")
def staAuthorized(self, name):
logger.debug("staAuthorized: " + name)
dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
+ dev1.group_form_result(self.sta_group_ev)
dev1.remove_group()
- ev = dev1.wait_event(["P2P-GROUP-REMOVED"], timeout=10)
+ ev = dev1.wait_global_event(["P2P-GROUP-REMOVED"], timeout=10)
if ev is None:
raise Exception("Group removal timed out")
- p2p.Disconnect()
+ group_p2p = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
+ group_p2p.Disconnect()
def run_test(self, *args):
logger.debug("run_test")
def groupStarted(self, properties):
logger.debug("groupStarted: " + str(properties))
- p2p.Disconnect()
+ g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
+ properties['interface_object'])
+ group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
+ group_p2p.Disconnect()
def groupFinished(self, properties):
logger.debug("groupFinished: " + str(properties))
ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15);
if ev is None:
raise Exception("Group formation timed out")
+ self.sta_group_ev = ev
def goNegotiationSuccess(self, properties):
logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
def groupStarted(self, properties):
logger.debug("groupStarted: " + str(properties))
+ self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
+ properties['interface_object'])
dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
+ dev1.group_form_result(self.sta_group_ev)
dev1.remove_group()
def staDeauthorized(self, name):
logger.debug("staDeuthorized: " + name)
- p2p.Disconnect()
+ group_p2p = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
+ group_p2p.Disconnect()
def peerJoined(self, peer):
logger.debug("peerJoined: " + peer)
ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15);
if ev is None:
raise Exception("Group formation timed out")
+ self.sta_group_ev = ev
def goNegotiationSuccess(self, properties):
logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
def groupStarted(self, properties):
logger.debug("groupStarted: " + str(properties))
- p2p.Disconnect()
+ g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
+ properties['interface_object'])
+ group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
+ group_p2p.Disconnect()
dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
+ dev1.group_form_result(self.sta_group_ev)
dev1.remove_group()
def groupFinished(self, properties):
addr1 = dev[1].p2p_dev_addr()
addr2 = dev[2].p2p_dev_addr()
dev[1].p2p_start_go(freq=2412)
+ dev1_group_ifname = dev[1].group_ifname
class TestDbusP2p(TestDbus):
def __init__(self, bus):
p2p.StopFind()
pin = '12345670'
dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
- dev1.request("WPS_PIN any " + pin)
+ dev1.group_ifname = dev1_group_ifname
+ dev1.group_request("WPS_PIN any " + pin)
args = { 'peer': self.go,
'join': True,
'wps_method': 'pin',
dev2 = WpaSupplicant('wlan2', '/tmp/wpas-wlan2')
dev2.scan_for_bss(bssid, freq=2412)
dev2.global_request("P2P_CONNECT " + bssid + " 12345670 join freq=2412")
+ ev = dev2.wait_global_event(["P2P-GROUP-STARTED"], timeout=15);
+ if ev is None:
+ raise Exception("Group join timed out")
+ self.dev2_group_ev = ev
def groupFinished(self, properties):
logger.debug("groupFinished: " + str(properties))
self.check_results()
dev2 = WpaSupplicant('wlan2', '/tmp/wpas-wlan2')
+ dev2.group_form_result(self.dev2_group_ev)
dev2.remove_group()
logger.info("Disconnect group2")