]> git.ipfire.org Git - thirdparty/hostap.git/blame - tests/hwsim/test_dbus.py
Allow PSK/passphrase to be set only when needed
[thirdparty/hostap.git] / tests / hwsim / test_dbus.py
CommitLineData
2ec82e67 1# wpa_supplicant D-Bus interface tests
910eb5b5 2# Copyright (c) 2014-2015, Jouni Malinen <j@w1.fi>
2ec82e67
JM
3#
4# This software may be distributed under the terms of the BSD license.
5# See README for more details.
6
7import binascii
2ec82e67
JM
8import logging
9logger = logging.getLogger()
10import subprocess
11import time
12
910eb5b5 13try:
b4de353c 14 import gobject
910eb5b5
JM
15 import dbus
16 dbus_imported = True
17except ImportError:
18 dbus_imported = False
19
2ec82e67
JM
20import hostapd
21from wpasupplicant import WpaSupplicant
0e126c6d 22from utils import HwsimSkip, alloc_fail
2ec82e67
JM
23from test_ap_tdls import connect_2sta_open
24
25WPAS_DBUS_SERVICE = "fi.w1.wpa_supplicant1"
26WPAS_DBUS_PATH = "/fi/w1/wpa_supplicant1"
27WPAS_DBUS_IFACE = "fi.w1.wpa_supplicant1.Interface"
28WPAS_DBUS_IFACE_WPS = WPAS_DBUS_IFACE + ".WPS"
29WPAS_DBUS_NETWORK = "fi.w1.wpa_supplicant1.Network"
30WPAS_DBUS_BSS = "fi.w1.wpa_supplicant1.BSS"
31WPAS_DBUS_IFACE_P2PDEVICE = WPAS_DBUS_IFACE + ".P2PDevice"
32WPAS_DBUS_P2P_PEER = "fi.w1.wpa_supplicant1.Peer"
33WPAS_DBUS_GROUP = "fi.w1.wpa_supplicant1.Group"
34WPAS_DBUS_PERSISTENT_GROUP = "fi.w1.wpa_supplicant1.PersistentGroup"
35
36def prepare_dbus(dev):
910eb5b5
JM
37 if not dbus_imported:
38 logger.info("No dbus module available")
51c5aeb4 39 raise HwsimSkip("No dbus module available")
2ec82e67 40 try:
2ec82e67
JM
41 from dbus.mainloop.glib import DBusGMainLoop
42 dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
43 bus = dbus.SystemBus()
44 wpas_obj = bus.get_object(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH)
45 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
46 path = wpas.GetInterface(dev.ifname)
47 if_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
910eb5b5 48 return (bus,wpas_obj,path,if_obj)
2ec82e67 49 except Exception, e:
51c5aeb4 50 raise HwsimSkip("Could not connect to D-Bus: %s" % e)
2ec82e67
JM
51
52class TestDbus(object):
53 def __init__(self, bus):
54 self.loop = gobject.MainLoop()
55 self.signals = []
56 self.bus = bus
57
58 def __exit__(self, type, value, traceback):
59 for s in self.signals:
60 s.remove()
61
62 def add_signal(self, handler, interface, name, byte_arrays=False):
63 s = self.bus.add_signal_receiver(handler, dbus_interface=interface,
64 signal_name=name,
65 byte_arrays=byte_arrays)
66 self.signals.append(s)
67
68 def timeout(self, *args):
69 logger.debug("timeout")
70 self.loop.quit()
71 return False
72
0e126c6d
JM
73class alloc_fail_dbus(object):
74 def __init__(self, dev, count, funcs, operation="Operation",
75 expected="NoMemory"):
76 self._dev = dev
77 self._count = count
78 self._funcs = funcs
79 self._operation = operation
80 self._expected = expected
81 def __enter__(self):
82 cmd = "TEST_ALLOC_FAIL %d:%s" % (self._count, self._funcs)
83 if "OK" not in self._dev.request(cmd):
84 raise HwsimSkip("TEST_ALLOC_FAIL not supported")
85 def __exit__(self, type, value, traceback):
86 if type is None:
87 raise Exception("%s succeeded during out-of-memory" % self._operation)
88 if type == dbus.exceptions.DBusException and self._expected in str(value):
89 return True
90 if self._dev.request("GET_ALLOC_FAIL") != "0:%s" % self._funcs:
91 raise Exception("%s did not trigger allocation failure" % self._operation)
92 return False
93
2ec82e67
JM
94def start_ap(ap):
95 ssid = "test-wps"
96 params = { "ssid": ssid, "eap_server": "1", "wps_state": "2",
97 "wpa_passphrase": "12345678", "wpa": "2",
98 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
99 "ap_pin": "12345670"}
100 return hostapd.add_ap(ap['ifname'], params)
101
102def test_dbus_getall(dev, apdev):
103 """D-Bus GetAll"""
910eb5b5 104 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
105
106 props = wpas_obj.GetAll(WPAS_DBUS_SERVICE,
107 dbus_interface=dbus.PROPERTIES_IFACE)
108 logger.debug("GetAll(fi.w1.wpa.supplicant1, /fi/w1/wpa_supplicant1) ==> " + str(props))
109
110 props = if_obj.GetAll(WPAS_DBUS_IFACE,
111 dbus_interface=dbus.PROPERTIES_IFACE)
112 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_IFACE, path, str(props)))
113
114 props = if_obj.GetAll(WPAS_DBUS_IFACE_WPS,
115 dbus_interface=dbus.PROPERTIES_IFACE)
116 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_IFACE_WPS, path, str(props)))
117
118 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
119 dbus_interface=dbus.PROPERTIES_IFACE)
120 if len(res) != 0:
121 raise Exception("Unexpected BSSs entry: " + str(res))
122
123 res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
124 dbus_interface=dbus.PROPERTIES_IFACE)
125 if len(res) != 0:
126 raise Exception("Unexpected Networks entry: " + str(res))
127
128 hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "open" })
129 bssid = apdev[0]['bssid']
130 dev[0].scan_for_bss(bssid, freq=2412)
131 id = dev[0].add_network()
132 dev[0].set_network(id, "disabled", "0")
133 dev[0].set_network_quoted(id, "ssid", "test")
134
135 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
136 dbus_interface=dbus.PROPERTIES_IFACE)
137 if len(res) != 1:
138 raise Exception("Missing BSSs entry: " + str(res))
139 bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
140 props = bss_obj.GetAll(WPAS_DBUS_BSS, dbus_interface=dbus.PROPERTIES_IFACE)
141 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_BSS, res[0], str(props)))
142 bssid_str = ''
143 for item in props['BSSID']:
144 if len(bssid_str) > 0:
145 bssid_str += ':'
146 bssid_str += '%02x' % item
147 if bssid_str != bssid:
148 raise Exception("Unexpected BSSID in BSSs entry")
149
150 res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
151 dbus_interface=dbus.PROPERTIES_IFACE)
152 if len(res) != 1:
153 raise Exception("Missing Networks entry: " + str(res))
154 net_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
155 props = net_obj.GetAll(WPAS_DBUS_NETWORK,
156 dbus_interface=dbus.PROPERTIES_IFACE)
157 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_NETWORK, res[0], str(props)))
158 ssid = props['Properties']['ssid']
159 if ssid != '"test"':
160 raise Exception("Unexpected SSID in network entry")
161
162def dbus_get(dbus, wpas_obj, prop, expect=None, byte_arrays=False):
163 val = wpas_obj.Get(WPAS_DBUS_SERVICE, prop,
164 dbus_interface=dbus.PROPERTIES_IFACE,
165 byte_arrays=byte_arrays)
166 if expect is not None and val != expect:
167 raise Exception("Unexpected %s: %s (expected: %s)" %
168 (prop, str(val), str(expect)))
169 return val
170
171def dbus_set(dbus, wpas_obj, prop, val):
172 wpas_obj.Set(WPAS_DBUS_SERVICE, prop, val,
173 dbus_interface=dbus.PROPERTIES_IFACE)
174
175def test_dbus_properties(dev, apdev):
176 """D-Bus Get/Set fi.w1.wpa_supplicant1 properties"""
910eb5b5 177 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
178
179 dbus_get(dbus, wpas_obj, "DebugLevel", expect="msgdump")
180 dbus_set(dbus, wpas_obj, "DebugLevel", "debug")
181 dbus_get(dbus, wpas_obj, "DebugLevel", expect="debug")
182 for (val,err) in [ (3, "Error.Failed: wrong property type"),
183 ("foo", "Error.Failed: wrong debug level value") ]:
184 try:
185 dbus_set(dbus, wpas_obj, "DebugLevel", val)
186 raise Exception("Invalid DebugLevel value accepted: " + str(val))
187 except dbus.exceptions.DBusException, e:
188 if err not in str(e):
189 raise Exception("Unexpected error message: " + str(e))
190 dbus_set(dbus, wpas_obj, "DebugLevel", "msgdump")
191 dbus_get(dbus, wpas_obj, "DebugLevel", expect="msgdump")
192
193 dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=True)
194 dbus_set(dbus, wpas_obj, "DebugTimestamp", False)
195 dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=False)
196 try:
197 dbus_set(dbus, wpas_obj, "DebugTimestamp", "foo")
198 raise Exception("Invalid DebugTimestamp value accepted")
199 except dbus.exceptions.DBusException, e:
200 if "Error.Failed: wrong property type" not in str(e):
201 raise Exception("Unexpected error message: " + str(e))
202 dbus_set(dbus, wpas_obj, "DebugTimestamp", True)
203 dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=True)
204
205 dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=True)
206 dbus_set(dbus, wpas_obj, "DebugShowKeys", False)
207 dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=False)
208 try:
209 dbus_set(dbus, wpas_obj, "DebugShowKeys", "foo")
210 raise Exception("Invalid DebugShowKeys value accepted")
211 except dbus.exceptions.DBusException, e:
212 if "Error.Failed: wrong property type" not in str(e):
213 raise Exception("Unexpected error message: " + str(e))
214 dbus_set(dbus, wpas_obj, "DebugShowKeys", True)
215 dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=True)
216
217 res = dbus_get(dbus, wpas_obj, "Interfaces")
218 if len(res) != 1:
219 raise Exception("Unexpected Interfaces value: " + str(res))
220
221 res = dbus_get(dbus, wpas_obj, "EapMethods")
222 if len(res) < 5 or "TTLS" not in res:
223 raise Exception("Unexpected EapMethods value: " + str(res))
224
225 res = dbus_get(dbus, wpas_obj, "Capabilities")
226 if len(res) < 2 or "p2p" not in res:
227 raise Exception("Unexpected Capabilities value: " + str(res))
228
229 dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True)
230 val = binascii.unhexlify("010006020304050608")
231 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(val))
232 res = dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True)
233 if val != res:
234 raise Exception("WFDIEs value changed")
235 try:
236 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray('\x00'))
237 raise Exception("Invalid WFDIEs value accepted")
238 except dbus.exceptions.DBusException, e:
239 if "InvalidArgs" not in str(e):
240 raise Exception("Unexpected error message: " + str(e))
241 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(''))
242 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(val))
243 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(''))
244 res = dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True)
245 if len(res) != 0:
246 raise Exception("WFDIEs not cleared properly")
247
248 res = dbus_get(dbus, wpas_obj, "EapMethods")
249 try:
250 dbus_set(dbus, wpas_obj, "EapMethods", res)
251 raise Exception("Invalid Set accepted")
252 except dbus.exceptions.DBusException, e:
253 if "InvalidArgs: Property is read-only" not in str(e):
254 raise Exception("Unexpected error message: " + str(e))
255
256 try:
257 wpas_obj.SetFoo(WPAS_DBUS_SERVICE, "DebugShowKeys", True,
258 dbus_interface=dbus.PROPERTIES_IFACE)
259 raise Exception("Unknown method accepted")
260 except dbus.exceptions.DBusException, e:
261 if "UnknownMethod" not in str(e):
262 raise Exception("Unexpected error message: " + str(e))
263
264 try:
265 wpas_obj.Get("foo", "DebugShowKeys",
266 dbus_interface=dbus.PROPERTIES_IFACE)
267 raise Exception("Invalid Get accepted")
268 except dbus.exceptions.DBusException, e:
269 if "InvalidArgs: No such property" not in str(e):
270 raise Exception("Unexpected error message: " + str(e))
271
272 test_obj = bus.get_object(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH,
273 introspect=False)
274 try:
275 test_obj.Get(123, "DebugShowKeys",
276 dbus_interface=dbus.PROPERTIES_IFACE)
277 raise Exception("Invalid Get accepted")
278 except dbus.exceptions.DBusException, e:
279 if "InvalidArgs: Invalid arguments" not in str(e):
280 raise Exception("Unexpected error message: " + str(e))
281 try:
282 test_obj.Get(WPAS_DBUS_SERVICE, 123,
283 dbus_interface=dbus.PROPERTIES_IFACE)
284 raise Exception("Invalid Get accepted")
285 except dbus.exceptions.DBusException, e:
286 if "InvalidArgs: Invalid arguments" not in str(e):
287 raise Exception("Unexpected error message: " + str(e))
288
289 try:
290 wpas_obj.Set(WPAS_DBUS_SERVICE, "WFDIEs",
291 dbus.ByteArray('', variant_level=2),
292 dbus_interface=dbus.PROPERTIES_IFACE)
293 raise Exception("Invalid Set accepted")
294 except dbus.exceptions.DBusException, e:
295 if "InvalidArgs: invalid message format" not in str(e):
296 raise Exception("Unexpected error message: " + str(e))
297
298def test_dbus_invalid_method(dev, apdev):
299 """D-Bus invalid method"""
910eb5b5 300 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
301 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
302
303 try:
304 wps.Foo()
305 raise Exception("Unknown method accepted")
306 except dbus.exceptions.DBusException, e:
307 if "UnknownMethod" not in str(e):
308 raise Exception("Unexpected error message: " + str(e))
309
310 test_obj = bus.get_object(WPAS_DBUS_SERVICE, path, introspect=False)
311 test_wps = dbus.Interface(test_obj, WPAS_DBUS_IFACE_WPS)
312 try:
313 test_wps.Start(123)
314 raise Exception("WPS.Start with incorrect signature accepted")
315 except dbus.exceptions.DBusException, e:
316 if "InvalidArgs: Invalid arg" not in str(e):
317 raise Exception("Unexpected error message: " + str(e))
318
319def test_dbus_get_set_wps(dev, apdev):
320 """D-Bus Get/Set for WPS properties"""
321 try:
81e787b7 322 _test_dbus_get_set_wps(dev, apdev)
2ec82e67
JM
323 finally:
324 dev[0].request("SET wps_cred_processing 0")
364e28c9 325 dev[0].request("SET config_methods display keypad virtual_display nfc_interface p2ps")
2ec82e67
JM
326
327def _test_dbus_get_set_wps(dev, apdev):
910eb5b5 328 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
329
330 if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods",
331 dbus_interface=dbus.PROPERTIES_IFACE)
332
333 val = "display keypad virtual_display nfc_interface"
334 dev[0].request("SET config_methods " + val)
335
336 config = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods",
337 dbus_interface=dbus.PROPERTIES_IFACE)
338 if config != val:
339 raise Exception("Unexpected Get(ConfigMethods) result: " + config)
340
341 val2 = "push_button display"
342 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ConfigMethods", val2,
343 dbus_interface=dbus.PROPERTIES_IFACE)
344 config = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods",
345 dbus_interface=dbus.PROPERTIES_IFACE)
346 if config != val2:
347 raise Exception("Unexpected Get(ConfigMethods) result after Set: " + config)
348
349 dev[0].request("SET config_methods " + val)
350
351 for i in range(3):
352 dev[0].request("SET wps_cred_processing " + str(i))
353 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
354 dbus_interface=dbus.PROPERTIES_IFACE)
355 expected_val = False if i == 1 else True
356 if val != expected_val:
357 raise Exception("Unexpected Get(ProcessCredentials) result({}): {}".format(i, val))
358
359 class TestDbusGetSet(TestDbus):
360 def __init__(self, bus):
361 TestDbus.__init__(self, bus)
362 self.signal_received = False
363 self.signal_received_deprecated = False
364 self.sets_done = False
365
366 def __enter__(self):
367 gobject.timeout_add(1, self.run_sets)
368 gobject.timeout_add(1000, self.timeout)
369 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE_WPS,
370 "PropertiesChanged")
371 self.add_signal(self.propertiesChanged2, dbus.PROPERTIES_IFACE,
372 "PropertiesChanged")
373 self.loop.run()
374 return self
375
376 def propertiesChanged(self, properties):
377 logger.debug("PropertiesChanged: " + str(properties))
378 if properties.has_key("ProcessCredentials"):
379 self.signal_received_deprecated = True
380 if self.sets_done and self.signal_received:
381 self.loop.quit()
382
383 def propertiesChanged2(self, interface_name, changed_properties,
384 invalidated_properties):
385 logger.debug("propertiesChanged2: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
386 if interface_name != WPAS_DBUS_IFACE_WPS:
387 return
388 if changed_properties.has_key("ProcessCredentials"):
389 self.signal_received = True
390 if self.sets_done and self.signal_received_deprecated:
391 self.loop.quit()
392
393 def run_sets(self, *args):
394 logger.debug("run_sets")
395 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
396 dbus.Boolean(1),
397 dbus_interface=dbus.PROPERTIES_IFACE)
398 if if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
399 dbus_interface=dbus.PROPERTIES_IFACE) != True:
400 raise Exception("Unexpected Get(ProcessCredentials) result after Set");
401 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
402 dbus.Boolean(0),
403 dbus_interface=dbus.PROPERTIES_IFACE)
404 if if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
405 dbus_interface=dbus.PROPERTIES_IFACE) != False:
406 raise Exception("Unexpected Get(ProcessCredentials) result after Set");
407
408 self.dbus_sets_done = True
409 return False
410
411 def success(self):
412 return self.signal_received and self.signal_received_deprecated
413
414 with TestDbusGetSet(bus) as t:
415 if not t.success():
416 raise Exception("No signal received for ProcessCredentials change")
417
418def test_dbus_wps_invalid(dev, apdev):
419 """D-Bus invaldi WPS operation"""
910eb5b5 420 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
421 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
422
423 failures = [ {'Role': 'foo', 'Type': 'pbc'},
424 {'Role': 123, 'Type': 'pbc'},
425 {'Type': 'pbc'},
426 {'Role': 'enrollee'},
427 {'Role': 'registrar'},
428 {'Role': 'enrollee', 'Type': 123},
429 {'Role': 'enrollee', 'Type': 'foo'},
430 {'Role': 'enrollee', 'Type': 'pbc',
431 'Bssid': '02:33:44:55:66:77'},
432 {'Role': 'enrollee', 'Type': 'pin', 'Pin': 123},
433 {'Role': 'enrollee', 'Type': 'pbc',
434 'Bssid': dbus.ByteArray('12345')},
435 {'Role': 'enrollee', 'Type': 'pbc',
436 'P2PDeviceAddress': 12345},
437 {'Role': 'enrollee', 'Type': 'pbc',
438 'P2PDeviceAddress': dbus.ByteArray('12345')},
439 {'Role': 'enrollee', 'Type': 'pbc', 'Foo': 'bar'} ]
440 for args in failures:
441 try:
442 wps.Start(args)
443 raise Exception("Invalid WPS.Start() arguments accepted: " + str(args))
444 except dbus.exceptions.DBusException, e:
445 if not str(e).startswith("fi.w1.wpa_supplicant1.InvalidArgs"):
446 raise Exception("Unexpected error message: " + str(e))
447
0e126c6d
JM
448def test_dbus_wps_oom(dev, apdev):
449 """D-Bus WPS operation (OOM)"""
450 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
451 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
452
453 with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_getter_state", "Get"):
454 if_obj.Get(WPAS_DBUS_IFACE, "State",
455 dbus_interface=dbus.PROPERTIES_IFACE)
456
457 hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "open" })
458 bssid = apdev[0]['bssid']
459 dev[0].scan_for_bss(bssid, freq=2412)
460
461 for i in range(1, 3):
462 with alloc_fail_dbus(dev[0], i, "=wpas_dbus_getter_bsss", "Get"):
463 if_obj.Get(WPAS_DBUS_IFACE, "BSSs",
464 dbus_interface=dbus.PROPERTIES_IFACE)
465
466 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
467 dbus_interface=dbus.PROPERTIES_IFACE)
468 bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
469 with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_getter_bss_rates", "Get"):
470 bss_obj.Get(WPAS_DBUS_BSS, "Rates",
471 dbus_interface=dbus.PROPERTIES_IFACE)
472
473 id = dev[0].add_network()
474 dev[0].set_network(id, "disabled", "0")
475 dev[0].set_network_quoted(id, "ssid", "test")
476
477 for i in range(1, 3):
478 with alloc_fail_dbus(dev[0], i, "=wpas_dbus_getter_networks", "Get"):
479 if_obj.Get(WPAS_DBUS_IFACE, "Networks",
480 dbus_interface=dbus.PROPERTIES_IFACE)
481
482 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_getter_interfaces", "Get"):
483 dbus_get(dbus, wpas_obj, "Interfaces")
484
485 for i in range(1, 6):
486 with alloc_fail_dbus(dev[0], i, "=eap_get_names_as_string_array;wpas_dbus_getter_eap_methods", "Get"):
487 dbus_get(dbus, wpas_obj, "EapMethods")
488
489 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_setter_config_methods", "Set",
490 expected="Error.Failed: Failed to set property"):
491 val2 = "push_button display"
492 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ConfigMethods", val2,
493 dbus_interface=dbus.PROPERTIES_IFACE)
494
495 with alloc_fail_dbus(dev[0], 1, "=wpa_config_add_network;wpas_dbus_handler_wps_start",
496 "WPS.Start",
497 expected="UnknownError: WPS start failed"):
498 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670'})
499
2ec82e67
JM
500def test_dbus_wps_pbc(dev, apdev):
501 """D-Bus WPS/PBC operation and signals"""
502 try:
81e787b7 503 _test_dbus_wps_pbc(dev, apdev)
2ec82e67
JM
504 finally:
505 dev[0].request("SET wps_cred_processing 0")
506
507def _test_dbus_wps_pbc(dev, apdev):
910eb5b5 508 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
509 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
510
511 hapd = start_ap(apdev[0])
512 hapd.request("WPS_PBC")
513 bssid = apdev[0]['bssid']
514 dev[0].scan_for_bss(bssid, freq="2412")
515 dev[0].request("SET wps_cred_processing 2")
516
517 class TestDbusWps(TestDbus):
518 def __init__(self, bus, wps):
519 TestDbus.__init__(self, bus)
520 self.success_seen = False
521 self.credentials_received = False
522 self.wps = wps
523
524 def __enter__(self):
525 gobject.timeout_add(1, self.start_pbc)
526 gobject.timeout_add(15000, self.timeout)
527 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
528 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
529 "Credentials")
530 self.loop.run()
531 return self
532
533 def wpsEvent(self, name, args):
534 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
535 if name == "success":
536 self.success_seen = True
537 if self.credentials_received:
538 self.loop.quit()
539
540 def credentials(self, args):
541 logger.debug("credentials: " + str(args))
542 self.credentials_received = True
543 if self.success_seen:
544 self.loop.quit()
545
546 def start_pbc(self, *args):
547 logger.debug("start_pbc")
548 self.wps.Start({'Role': 'enrollee', 'Type': 'pbc'})
549 return False
550
551 def success(self):
552 return self.success_seen and self.credentials_received
553
554 with TestDbusWps(bus, wps) as t:
555 if not t.success():
556 raise Exception("Failure in D-Bus operations")
557
558 dev[0].wait_connected(timeout=10)
559 dev[0].request("DISCONNECT")
560 hapd.disable()
561 dev[0].flush_scan_cache()
562
563def test_dbus_wps_pin(dev, apdev):
564 """D-Bus WPS/PIN operation and signals"""
565 try:
81e787b7 566 _test_dbus_wps_pin(dev, apdev)
2ec82e67
JM
567 finally:
568 dev[0].request("SET wps_cred_processing 0")
569
570def _test_dbus_wps_pin(dev, apdev):
910eb5b5 571 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
572 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
573
574 hapd = start_ap(apdev[0])
575 hapd.request("WPS_PIN any 12345670")
576 bssid = apdev[0]['bssid']
577 dev[0].scan_for_bss(bssid, freq="2412")
578 dev[0].request("SET wps_cred_processing 2")
579
580 class TestDbusWps(TestDbus):
581 def __init__(self, bus):
582 TestDbus.__init__(self, bus)
583 self.success_seen = False
584 self.credentials_received = False
585
586 def __enter__(self):
587 gobject.timeout_add(1, self.start_pin)
588 gobject.timeout_add(15000, self.timeout)
589 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
590 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
591 "Credentials")
592 self.loop.run()
593 return self
594
595 def wpsEvent(self, name, args):
596 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
597 if name == "success":
598 self.success_seen = True
599 if self.credentials_received:
600 self.loop.quit()
601
602 def credentials(self, args):
603 logger.debug("credentials: " + str(args))
604 self.credentials_received = True
605 if self.success_seen:
606 self.loop.quit()
607
608 def start_pin(self, *args):
609 logger.debug("start_pin")
610 bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
611 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
612 'Bssid': bssid_ay})
613 return False
614
615 def success(self):
616 return self.success_seen and self.credentials_received
617
618 with TestDbusWps(bus) as t:
619 if not t.success():
620 raise Exception("Failure in D-Bus operations")
621
622 dev[0].wait_connected(timeout=10)
623
624def test_dbus_wps_pin2(dev, apdev):
625 """D-Bus WPS/PIN operation and signals (PIN from wpa_supplicant)"""
626 try:
81e787b7 627 _test_dbus_wps_pin2(dev, apdev)
2ec82e67
JM
628 finally:
629 dev[0].request("SET wps_cred_processing 0")
630
631def _test_dbus_wps_pin2(dev, apdev):
910eb5b5 632 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
633 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
634
635 hapd = start_ap(apdev[0])
636 bssid = apdev[0]['bssid']
637 dev[0].scan_for_bss(bssid, freq="2412")
638 dev[0].request("SET wps_cred_processing 2")
639
640 class TestDbusWps(TestDbus):
641 def __init__(self, bus):
642 TestDbus.__init__(self, bus)
643 self.success_seen = False
644 self.failed = False
645
646 def __enter__(self):
647 gobject.timeout_add(1, self.start_pin)
648 gobject.timeout_add(15000, self.timeout)
649 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
650 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
651 "Credentials")
652 self.loop.run()
653 return self
654
655 def wpsEvent(self, name, args):
656 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
657 if name == "success":
658 self.success_seen = True
659 if self.credentials_received:
660 self.loop.quit()
661
662 def credentials(self, args):
663 logger.debug("credentials: " + str(args))
664 self.credentials_received = True
665 if self.success_seen:
666 self.loop.quit()
667
668 def start_pin(self, *args):
669 logger.debug("start_pin")
670 bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
671 res = wps.Start({'Role': 'enrollee', 'Type': 'pin',
672 'Bssid': bssid_ay})
673 pin = res['Pin']
674 h = hostapd.Hostapd(apdev[0]['ifname'])
675 h.request("WPS_PIN any " + pin)
676 return False
677
678 def success(self):
679 return self.success_seen and self.credentials_received
680
681 with TestDbusWps(bus) as t:
682 if not t.success():
683 raise Exception("Failure in D-Bus operations")
684
685 dev[0].wait_connected(timeout=10)
686
687def test_dbus_wps_pin_m2d(dev, apdev):
688 """D-Bus WPS/PIN operation and signals with M2D"""
689 try:
81e787b7 690 _test_dbus_wps_pin_m2d(dev, apdev)
2ec82e67
JM
691 finally:
692 dev[0].request("SET wps_cred_processing 0")
693
694def _test_dbus_wps_pin_m2d(dev, apdev):
910eb5b5 695 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
696 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
697
698 hapd = start_ap(apdev[0])
699 bssid = apdev[0]['bssid']
700 dev[0].scan_for_bss(bssid, freq="2412")
701 dev[0].request("SET wps_cred_processing 2")
702
703 class TestDbusWps(TestDbus):
704 def __init__(self, bus):
705 TestDbus.__init__(self, bus)
706 self.success_seen = False
707 self.credentials_received = False
708
709 def __enter__(self):
710 gobject.timeout_add(1, self.start_pin)
711 gobject.timeout_add(15000, self.timeout)
712 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
713 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
714 "Credentials")
715 self.loop.run()
716 return self
717
718 def wpsEvent(self, name, args):
719 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
720 if name == "success":
721 self.success_seen = True
722 if self.credentials_received:
723 self.loop.quit()
724 elif name == "m2d":
725 h = hostapd.Hostapd(apdev[0]['ifname'])
726 h.request("WPS_PIN any 12345670")
727
728 def credentials(self, args):
729 logger.debug("credentials: " + str(args))
730 self.credentials_received = True
731 if self.success_seen:
732 self.loop.quit()
733
734 def start_pin(self, *args):
735 logger.debug("start_pin")
736 bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
737 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
738 'Bssid': bssid_ay})
739 return False
740
741 def success(self):
742 return self.success_seen and self.credentials_received
743
744 with TestDbusWps(bus) as t:
745 if not t.success():
746 raise Exception("Failure in D-Bus operations")
747
748 dev[0].wait_connected(timeout=10)
749
750def test_dbus_wps_reg(dev, apdev):
751 """D-Bus WPS/Registrar operation and signals"""
752 try:
81e787b7 753 _test_dbus_wps_reg(dev, apdev)
2ec82e67
JM
754 finally:
755 dev[0].request("SET wps_cred_processing 0")
756
757def _test_dbus_wps_reg(dev, apdev):
910eb5b5 758 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
759 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
760
761 hapd = start_ap(apdev[0])
762 hapd.request("WPS_PIN any 12345670")
763 bssid = apdev[0]['bssid']
764 dev[0].scan_for_bss(bssid, freq="2412")
765 dev[0].request("SET wps_cred_processing 2")
766
767 class TestDbusWps(TestDbus):
768 def __init__(self, bus):
769 TestDbus.__init__(self, bus)
770 self.credentials_received = False
771
772 def __enter__(self):
773 gobject.timeout_add(100, self.start_reg)
774 gobject.timeout_add(15000, self.timeout)
775 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
776 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
777 "Credentials")
778 self.loop.run()
779 return self
780
781 def wpsEvent(self, name, args):
782 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
783
784 def credentials(self, args):
785 logger.debug("credentials: " + str(args))
786 self.credentials_received = True
787 self.loop.quit()
788
789 def start_reg(self, *args):
790 logger.debug("start_reg")
791 bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
792 wps.Start({'Role': 'registrar', 'Type': 'pin',
793 'Pin': '12345670', 'Bssid': bssid_ay})
794 return False
795
796 def success(self):
797 return self.credentials_received
798
799 with TestDbusWps(bus) as t:
800 if not t.success():
801 raise Exception("Failure in D-Bus operations")
802
803 dev[0].wait_connected(timeout=10)
804
805def test_dbus_scan_invalid(dev, apdev):
806 """D-Bus invalid scan method"""
910eb5b5 807 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
808 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
809
810 tests = [ ({}, "InvalidArgs"),
811 ({'Type': 123}, "InvalidArgs"),
812 ({'Type': 'foo'}, "InvalidArgs"),
813 ({'Type': 'active', 'Foo': 'bar'}, "InvalidArgs"),
814 ({'Type': 'active', 'SSIDs': 'foo'}, "InvalidArgs"),
815 ({'Type': 'active', 'SSIDs': ['foo']}, "InvalidArgs"),
816 ({'Type': 'active',
817 'SSIDs': [ dbus.ByteArray("1"), dbus.ByteArray("2"),
818 dbus.ByteArray("3"), dbus.ByteArray("4"),
819 dbus.ByteArray("5"), dbus.ByteArray("6"),
820 dbus.ByteArray("7"), dbus.ByteArray("8"),
821 dbus.ByteArray("9"), dbus.ByteArray("10"),
822 dbus.ByteArray("11"), dbus.ByteArray("12"),
823 dbus.ByteArray("13"), dbus.ByteArray("14"),
824 dbus.ByteArray("15"), dbus.ByteArray("16"),
825 dbus.ByteArray("17") ]},
826 "InvalidArgs"),
827 ({'Type': 'active',
828 'SSIDs': [ dbus.ByteArray("1234567890abcdef1234567890abcdef1") ]},
829 "InvalidArgs"),
830 ({'Type': 'active', 'IEs': 'foo'}, "InvalidArgs"),
831 ({'Type': 'active', 'IEs': ['foo']}, "InvalidArgs"),
832 ({'Type': 'active', 'Channels': 2412 }, "InvalidArgs"),
833 ({'Type': 'active', 'Channels': [ 2412 ] }, "InvalidArgs"),
834 ({'Type': 'active',
835 'Channels': [ (dbus.Int32(2412), dbus.UInt32(20)) ] },
836 "InvalidArgs"),
837 ({'Type': 'active',
838 'Channels': [ (dbus.UInt32(2412), dbus.Int32(20)) ] },
839 "InvalidArgs"),
840 ({'Type': 'active', 'AllowRoam': "yes" }, "InvalidArgs"),
841 ({'Type': 'passive', 'IEs': [ dbus.ByteArray("\xdd\x00") ]},
842 "InvalidArgs"),
843 ({'Type': 'passive', 'SSIDs': [ dbus.ByteArray("foo") ]},
844 "InvalidArgs")]
845 for (t,err) in tests:
846 try:
847 iface.Scan(t)
848 raise Exception("Invalid Scan() arguments accepted: " + str(t))
849 except dbus.exceptions.DBusException, e:
850 if err not in str(e):
851 raise Exception("Unexpected error message for invalid Scan(%s): %s" % (str(t), str(e)))
852
0e126c6d
JM
853def test_dbus_scan_oom(dev, apdev):
854 """D-Bus scan method and OOM"""
855 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
856 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
857
858 with alloc_fail_dbus(dev[0], 1,
859 "wpa_scan_clone_params;wpas_dbus_handler_scan",
860 "Scan", expected="ScanError: Scan request rejected"):
861 iface.Scan({ 'Type': 'passive',
862 'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
863
864 with alloc_fail_dbus(dev[0], 1,
865 "=wpas_dbus_get_scan_channels;wpas_dbus_handler_scan",
866 "Scan"):
867 iface.Scan({ 'Type': 'passive',
868 'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
869
870 with alloc_fail_dbus(dev[0], 1,
871 "=wpas_dbus_get_scan_ies;wpas_dbus_handler_scan",
872 "Scan"):
873 iface.Scan({ 'Type': 'active',
874 'IEs': [ dbus.ByteArray("\xdd\x00") ],
875 'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
876
877 with alloc_fail_dbus(dev[0], 1,
878 "=wpas_dbus_get_scan_ssids;wpas_dbus_handler_scan",
879 "Scan"):
880 iface.Scan({ 'Type': 'active',
881 'SSIDs': [ dbus.ByteArray("open"),
882 dbus.ByteArray() ],
883 'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
884
2ec82e67
JM
885def test_dbus_scan(dev, apdev):
886 """D-Bus scan and related signals"""
910eb5b5 887 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
888 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
889
890 hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "open" })
891
892 class TestDbusScan(TestDbus):
893 def __init__(self, bus):
894 TestDbus.__init__(self, bus)
895 self.scan_completed = 0
896 self.bss_added = False
897
898 def __enter__(self):
899 gobject.timeout_add(1, self.run_scan)
900 gobject.timeout_add(15000, self.timeout)
901 self.add_signal(self.scanDone, WPAS_DBUS_IFACE, "ScanDone")
902 self.add_signal(self.bssAdded, WPAS_DBUS_IFACE, "BSSAdded")
903 self.add_signal(self.bssRemoved, WPAS_DBUS_IFACE, "BSSRemoved")
904 self.loop.run()
905 return self
906
907 def scanDone(self, success):
908 logger.debug("scanDone: success=%s" % success)
909 self.scan_completed += 1
910 if self.scan_completed == 1:
911 iface.Scan({'Type': 'passive',
912 'AllowRoam': True,
913 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
914 elif self.scan_completed == 2:
915 iface.Scan({'Type': 'passive',
916 'AllowRoam': False})
917 elif self.bss_added and self.scan_completed == 3:
918 self.loop.quit()
919
920 def bssAdded(self, bss, properties):
921 logger.debug("bssAdded: %s" % bss)
922 logger.debug(str(properties))
923 self.bss_added = True
924 if self.scan_completed == 3:
925 self.loop.quit()
926
927 def bssRemoved(self, bss):
928 logger.debug("bssRemoved: %s" % bss)
929
930 def run_scan(self, *args):
931 logger.debug("run_scan")
932 iface.Scan({'Type': 'active',
933 'SSIDs': [ dbus.ByteArray("open"),
934 dbus.ByteArray() ],
935 'IEs': [ dbus.ByteArray("\xdd\x00"),
936 dbus.ByteArray() ],
937 'AllowRoam': False,
938 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
939 return False
940
941 def success(self):
942 return self.scan_completed == 3 and self.bss_added
943
944 with TestDbusScan(bus) as t:
945 if not t.success():
946 raise Exception("Expected signals not seen")
947
948 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSs",
949 dbus_interface=dbus.PROPERTIES_IFACE)
950 if len(res) < 1:
951 raise Exception("Scan result not in BSSs property")
952 iface.FlushBSS(0)
953 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSs",
954 dbus_interface=dbus.PROPERTIES_IFACE)
955 if len(res) != 0:
956 raise Exception("FlushBSS() did not remove scan results from BSSs property")
957 iface.FlushBSS(1)
958
959def test_dbus_scan_busy(dev, apdev):
960 """D-Bus scan trigger rejection when busy with previous scan"""
910eb5b5 961 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
962 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
963
964 if "OK" not in dev[0].request("SCAN freq=2412-2462"):
965 raise Exception("Failed to start scan")
966 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-STARTED"], 15)
967 if ev is None:
968 raise Exception("Scan start timed out")
969
970 try:
971 iface.Scan({'Type': 'active', 'AllowRoam': False})
972 raise Exception("Scan() accepted when busy")
973 except dbus.exceptions.DBusException, e:
974 if "ScanError: Scan request reject" not in str(e):
975 raise Exception("Unexpected error message: " + str(e))
976
977 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
978 if ev is None:
979 raise Exception("Scan timed out")
980
981def test_dbus_connect(dev, apdev):
982 """D-Bus AddNetwork and connect"""
910eb5b5 983 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
984 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
985
986 ssid = "test-wpa2-psk"
987 passphrase = 'qwertyuiop'
988 params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
989 hapd = hostapd.add_ap(apdev[0]['ifname'], params)
990
991 class TestDbusConnect(TestDbus):
992 def __init__(self, bus):
993 TestDbus.__init__(self, bus)
994 self.network_added = False
995 self.network_selected = False
996 self.network_removed = False
997 self.state = 0
998
999 def __enter__(self):
1000 gobject.timeout_add(1, self.run_connect)
1001 gobject.timeout_add(15000, self.timeout)
1002 self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
1003 self.add_signal(self.networkRemoved, WPAS_DBUS_IFACE,
1004 "NetworkRemoved")
1005 self.add_signal(self.networkSelected, WPAS_DBUS_IFACE,
1006 "NetworkSelected")
1007 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1008 "PropertiesChanged")
1009 self.loop.run()
1010 return self
1011
1012 def networkAdded(self, network, properties):
1013 logger.debug("networkAdded: %s" % str(network))
1014 logger.debug(str(properties))
1015 self.network_added = True
1016
1017 def networkRemoved(self, network):
1018 logger.debug("networkRemoved: %s" % str(network))
1019 self.network_removed = True
1020
1021 def networkSelected(self, network):
1022 logger.debug("networkSelected: %s" % str(network))
1023 self.network_selected = True
1024
1025 def propertiesChanged(self, properties):
1026 logger.debug("propertiesChanged: %s" % str(properties))
1027 if 'State' in properties and properties['State'] == "completed":
1028 if self.state == 0:
1029 self.state = 1
1030 iface.Disconnect()
1031 elif self.state == 2:
1032 self.state = 3
1033 iface.Disconnect()
1034 elif self.state == 4:
1035 self.state = 5
1036 iface.Reattach()
1037 elif self.state == 5:
1038 self.state = 6
1039 res = iface.SignalPoll()
1040 logger.debug("SignalPoll: " + str(res))
1041 if 'frequency' not in res or res['frequency'] != 2412:
1042 self.state = -1
1043 logger.info("Unexpected SignalPoll result")
1044 iface.RemoveNetwork(self.netw)
1045 if 'State' in properties and properties['State'] == "disconnected":
1046 if self.state == 1:
1047 self.state = 2
1048 iface.SelectNetwork(self.netw)
1049 elif self.state == 3:
1050 self.state = 4
1051 iface.Reassociate()
1052 elif self.state == 6:
1053 self.state = 7
1054 self.loop.quit()
1055
1056 def run_connect(self, *args):
1057 logger.debug("run_connect")
1058 args = dbus.Dictionary({ 'ssid': ssid,
1059 'key_mgmt': 'WPA-PSK',
1060 'psk': passphrase,
1061 'scan_freq': 2412 },
1062 signature='sv')
1063 self.netw = iface.AddNetwork(args)
1064 iface.SelectNetwork(self.netw)
1065 return False
1066
1067 def success(self):
1068 if not self.network_added or \
1069 not self.network_removed or \
1070 not self.network_selected:
1071 return False
1072 return self.state == 7
1073
1074 with TestDbusConnect(bus) as t:
1075 if not t.success():
1076 raise Exception("Expected signals not seen")
1077
0e126c6d
JM
1078def test_dbus_connect_oom(dev, apdev):
1079 """D-Bus AddNetwork and connect when out-of-memory"""
1080 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1081 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1082
1083 if "OK" not in dev[0].request("TEST_ALLOC_FAIL 0:"):
1084 raise HwsimSkip("TEST_ALLOC_FAIL not supported in the build")
1085
1086 ssid = "test-wpa2-psk"
1087 passphrase = 'qwertyuiop'
1088 params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
1089 hapd = hostapd.add_ap(apdev[0]['ifname'], params)
1090
1091 class TestDbusConnect(TestDbus):
1092 def __init__(self, bus):
1093 TestDbus.__init__(self, bus)
1094 self.network_added = False
1095 self.network_selected = False
1096 self.network_removed = False
1097 self.state = 0
1098
1099 def __enter__(self):
1100 gobject.timeout_add(1, self.run_connect)
1101 gobject.timeout_add(1500, self.timeout)
1102 self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
1103 self.add_signal(self.networkRemoved, WPAS_DBUS_IFACE,
1104 "NetworkRemoved")
1105 self.add_signal(self.networkSelected, WPAS_DBUS_IFACE,
1106 "NetworkSelected")
1107 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1108 "PropertiesChanged")
1109 self.loop.run()
1110 return self
1111
1112 def networkAdded(self, network, properties):
1113 logger.debug("networkAdded: %s" % str(network))
1114 logger.debug(str(properties))
1115 self.network_added = True
1116
1117 def networkRemoved(self, network):
1118 logger.debug("networkRemoved: %s" % str(network))
1119 self.network_removed = True
1120
1121 def networkSelected(self, network):
1122 logger.debug("networkSelected: %s" % str(network))
1123 self.network_selected = True
1124
1125 def propertiesChanged(self, properties):
1126 logger.debug("propertiesChanged: %s" % str(properties))
1127 if 'State' in properties and properties['State'] == "completed":
1128 if self.state == 0:
1129 self.state = 1
1130 iface.Disconnect()
1131 elif self.state == 2:
1132 self.state = 3
1133 iface.Disconnect()
1134 elif self.state == 4:
1135 self.state = 5
1136 iface.Reattach()
1137 elif self.state == 5:
1138 self.state = 6
1139 res = iface.SignalPoll()
1140 logger.debug("SignalPoll: " + str(res))
1141 if 'frequency' not in res or res['frequency'] != 2412:
1142 self.state = -1
1143 logger.info("Unexpected SignalPoll result")
1144 iface.RemoveNetwork(self.netw)
1145 if 'State' in properties and properties['State'] == "disconnected":
1146 if self.state == 1:
1147 self.state = 2
1148 iface.SelectNetwork(self.netw)
1149 elif self.state == 3:
1150 self.state = 4
1151 iface.Reassociate()
1152 elif self.state == 6:
1153 self.state = 7
1154 self.loop.quit()
1155
1156 def run_connect(self, *args):
1157 logger.debug("run_connect")
1158 args = dbus.Dictionary({ 'ssid': ssid,
1159 'key_mgmt': 'WPA-PSK',
1160 'psk': passphrase,
1161 'scan_freq': 2412 },
1162 signature='sv')
1163 try:
1164 self.netw = iface.AddNetwork(args)
1165 except Exception, e:
1166 logger.info("Exception on AddNetwork: " + str(e))
1167 self.loop.quit()
1168 return False
1169 try:
1170 iface.SelectNetwork(self.netw)
1171 except Exception, e:
1172 logger.info("Exception on SelectNetwork: " + str(e))
1173 self.loop.quit()
1174
1175 return False
1176
1177 def success(self):
1178 if not self.network_added or \
1179 not self.network_removed or \
1180 not self.network_selected:
1181 return False
1182 return self.state == 7
1183
1184 count = 0
1185 for i in range(1, 1000):
1186 for j in range(3):
1187 dev[j].dump_monitor()
1188 dev[0].request("TEST_ALLOC_FAIL %d:main" % i)
1189 try:
1190 with TestDbusConnect(bus) as t:
1191 if not t.success():
1192 logger.info("Iteration %d - Expected signals not seen" % i)
1193 else:
1194 logger.info("Iteration %d - success" % i)
1195
1196 state = dev[0].request('GET_ALLOC_FAIL')
1197 logger.info("GET_ALLOC_FAIL: " + state)
1198 dev[0].dump_monitor()
1199 dev[0].request("TEST_ALLOC_FAIL 0:")
1200 if i < 3:
1201 raise Exception("Connection succeeded during out-of-memory")
1202 if not state.startswith('0:'):
1203 count += 1
1204 if count == 5:
1205 break
1206 except:
1207 pass
2ec82e67
JM
1208
1209def test_dbus_while_not_connected(dev, apdev):
1210 """D-Bus invalid operations while not connected"""
910eb5b5 1211 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1212 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1213
1214 try:
1215 iface.Disconnect()
1216 raise Exception("Disconnect() accepted when not connected")
1217 except dbus.exceptions.DBusException, e:
1218 if "NotConnected" not in str(e):
1219 raise Exception("Unexpected error message for invalid Disconnect: " + str(e))
1220
1221 try:
1222 iface.Reattach()
1223 raise Exception("Reattach() accepted when not connected")
1224 except dbus.exceptions.DBusException, e:
1225 if "NotConnected" not in str(e):
1226 raise Exception("Unexpected error message for invalid Reattach: " + str(e))
1227
1228def test_dbus_connect_eap(dev, apdev):
1229 """D-Bus AddNetwork and connect to EAP network"""
910eb5b5 1230 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1231 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1232
1233 ssid = "ieee8021x-open"
1234 params = hostapd.radius_params()
1235 params["ssid"] = ssid
1236 params["ieee8021x"] = "1"
1237 hapd = hostapd.add_ap(apdev[0]['ifname'], params)
1238
1239 class TestDbusConnect(TestDbus):
1240 def __init__(self, bus):
1241 TestDbus.__init__(self, bus)
1242 self.certification_received = False
1243 self.eap_status = False
1244 self.state = 0
1245
1246 def __enter__(self):
1247 gobject.timeout_add(1, self.run_connect)
1248 gobject.timeout_add(15000, self.timeout)
1249 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1250 "PropertiesChanged")
1251 self.add_signal(self.certification, WPAS_DBUS_IFACE,
2099fed4 1252 "Certification", byte_arrays=True)
2ec82e67
JM
1253 self.add_signal(self.networkRequest, WPAS_DBUS_IFACE,
1254 "NetworkRequest")
1255 self.add_signal(self.eap, WPAS_DBUS_IFACE, "EAP")
1256 self.loop.run()
1257 return self
1258
1259 def propertiesChanged(self, properties):
1260 logger.debug("propertiesChanged: %s" % str(properties))
1261 if 'State' in properties and properties['State'] == "completed":
1262 if self.state == 0:
1263 self.state = 1
1264 iface.EAPLogoff()
2099fed4
JM
1265 logger.info("Set dNSName constraint")
1266 net_obj = bus.get_object(WPAS_DBUS_SERVICE, self.netw)
1267 args = dbus.Dictionary({ 'altsubject_match':
1268 self.server_dnsname },
1269 signature='sv')
1270 net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args,
1271 dbus_interface=dbus.PROPERTIES_IFACE)
2ec82e67
JM
1272 elif self.state == 2:
1273 self.state = 3
2099fed4
JM
1274 iface.Disconnect()
1275 logger.info("Set non-matching dNSName constraint")
1276 net_obj = bus.get_object(WPAS_DBUS_SERVICE, self.netw)
1277 args = dbus.Dictionary({ 'altsubject_match':
1278 self.server_dnsname + "FOO" },
1279 signature='sv')
1280 net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args,
1281 dbus_interface=dbus.PROPERTIES_IFACE)
2ec82e67
JM
1282 if 'State' in properties and properties['State'] == "disconnected":
1283 if self.state == 1:
1284 self.state = 2
1285 iface.EAPLogon()
1286 iface.SelectNetwork(self.netw)
2099fed4
JM
1287 if self.state == 3:
1288 self.state = 4
1289 iface.SelectNetwork(self.netw)
2ec82e67
JM
1290
1291 def certification(self, args):
1292 logger.debug("certification: %s" % str(args))
1293 self.certification_received = True
2099fed4
JM
1294 if args['depth'] == 0:
1295 # The test server certificate is supposed to have dNSName
1296 if len(args['altsubject']) < 1:
1297 raise Exception("Missing dNSName")
1298 dnsname = args['altsubject'][0]
1299 if not dnsname.startswith("DNS:"):
1300 raise Exception("Expected dNSName not found: " + dnsname)
1301 logger.info("altsubject: " + dnsname)
1302 self.server_dnsname = dnsname
2ec82e67
JM
1303
1304 def eap(self, status, parameter):
1305 logger.debug("EAP: status=%s parameter=%s" % (status, parameter))
1306 if status == 'completion' and parameter == 'success':
1307 self.eap_status = True
2099fed4
JM
1308 if self.state == 4 and status == 'remote certificate verification' and parameter == 'AltSubject mismatch':
1309 self.state = 5
1310 self.loop.quit()
2ec82e67
JM
1311
1312 def networkRequest(self, path, field, txt):
1313 logger.debug("networkRequest: %s %s %s" % (path, field, txt))
1314 if field == "PASSWORD":
1315 iface.NetworkReply(path, field, "password")
1316
1317 def run_connect(self, *args):
1318 logger.debug("run_connect")
1319 args = dbus.Dictionary({ 'ssid': ssid,
1320 'key_mgmt': 'IEEE8021X',
1321 'eapol_flags': 0,
1322 'eap': 'TTLS',
1323 'anonymous_identity': 'ttls',
1324 'identity': 'pap user',
1325 'ca_cert': 'auth_serv/ca.pem',
1326 'phase2': 'auth=PAP',
1327 'scan_freq': 2412 },
1328 signature='sv')
1329 self.netw = iface.AddNetwork(args)
1330 iface.SelectNetwork(self.netw)
1331 return False
1332
1333 def success(self):
1334 if not self.eap_status or not self.certification_received:
1335 return False
2099fed4 1336 return self.state == 5
2ec82e67
JM
1337
1338 with TestDbusConnect(bus) as t:
1339 if not t.success():
1340 raise Exception("Expected signals not seen")
1341
1342def test_dbus_network(dev, apdev):
1343 """D-Bus AddNetwork/RemoveNetwork parameters and error cases"""
910eb5b5 1344 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1345 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1346
1347 args = dbus.Dictionary({ 'ssid': "foo",
1348 'key_mgmt': 'WPA-PSK',
1349 'psk': "12345678",
1350 'identity': dbus.ByteArray([ 1, 2 ]),
1351 'priority': dbus.Int32(0),
1352 'scan_freq': dbus.UInt32(2412) },
1353 signature='sv')
1354 netw = iface.AddNetwork(args)
1355 iface.RemoveNetwork(netw)
1356 try:
1357 iface.RemoveNetwork(netw)
1358 raise Exception("Invalid RemoveNetwork() accepted")
1359 except dbus.exceptions.DBusException, e:
1360 if "NetworkUnknown" not in str(e):
1361 raise Exception("Unexpected error message for invalid RemoveNetwork: " + str(e))
1362 try:
1363 iface.SelectNetwork(netw)
1364 raise Exception("Invalid SelectNetwork() accepted")
1365 except dbus.exceptions.DBusException, e:
1366 if "NetworkUnknown" not in str(e):
1367 raise Exception("Unexpected error message for invalid RemoveNetwork: " + str(e))
1368
1369 args = dbus.Dictionary({ 'ssid': "foo1", 'key_mgmt': 'NONE',
1370 'identity': "testuser", 'scan_freq': '2412' },
1371 signature='sv')
1372 netw1 = iface.AddNetwork(args)
1373 args = dbus.Dictionary({ 'ssid': "foo2", 'key_mgmt': 'NONE' },
1374 signature='sv')
1375 netw2 = iface.AddNetwork(args)
1376 res = if_obj.Get(WPAS_DBUS_IFACE, "Networks",
1377 dbus_interface=dbus.PROPERTIES_IFACE)
1378 if len(res) != 2:
1379 raise Exception("Unexpected number of networks")
1380
1381 net_obj = bus.get_object(WPAS_DBUS_SERVICE, netw1)
1382 res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled",
1383 dbus_interface=dbus.PROPERTIES_IFACE)
1384 if res != False:
1385 raise Exception("Added network was unexpectedly enabled by default")
1386 net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.Boolean(True),
1387 dbus_interface=dbus.PROPERTIES_IFACE)
1388 res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled",
1389 dbus_interface=dbus.PROPERTIES_IFACE)
1390 if res != True:
1391 raise Exception("Set(Enabled,True) did not seem to change property value")
1392 net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.Boolean(False),
1393 dbus_interface=dbus.PROPERTIES_IFACE)
1394 res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled",
1395 dbus_interface=dbus.PROPERTIES_IFACE)
1396 if res != False:
1397 raise Exception("Set(Enabled,False) did not seem to change property value")
1398 try:
1399 net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.UInt32(1),
1400 dbus_interface=dbus.PROPERTIES_IFACE)
1401 raise Exception("Invalid Set(Enabled,1) accepted")
1402 except dbus.exceptions.DBusException, e:
1403 if "Error.Failed: wrong property type" not in str(e):
1404 raise Exception("Unexpected error message for invalid Set(Enabled,1): " + str(e))
1405
1406 args = dbus.Dictionary({ 'ssid': "foo1new" }, signature='sv')
1407 net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args,
1408 dbus_interface=dbus.PROPERTIES_IFACE)
1409 res = net_obj.Get(WPAS_DBUS_NETWORK, "Properties",
1410 dbus_interface=dbus.PROPERTIES_IFACE)
1411 if res['ssid'] != '"foo1new"':
1412 raise Exception("Set(Properties) failed to update ssid")
1413 if res['identity'] != '"testuser"':
1414 raise Exception("Set(Properties) unexpectedly changed unrelated parameter")
1415
1416 iface.RemoveAllNetworks()
1417 res = if_obj.Get(WPAS_DBUS_IFACE, "Networks",
1418 dbus_interface=dbus.PROPERTIES_IFACE)
1419 if len(res) != 0:
1420 raise Exception("Unexpected number of networks")
1421 iface.RemoveAllNetworks()
1422
1423 tests = [ dbus.Dictionary({ 'psk': "1234567" }, signature='sv'),
1424 dbus.Dictionary({ 'identity': dbus.ByteArray() },
1425 signature='sv'),
1426 dbus.Dictionary({ 'identity': dbus.Byte(1) }, signature='sv'),
1427 dbus.Dictionary({ 'identity': "" }, signature='sv') ]
1428 for args in tests:
1429 try:
1430 iface.AddNetwork(args)
1431 raise Exception("Invalid AddNetwork args accepted: " + str(args))
1432 except dbus.exceptions.DBusException, e:
1433 if "InvalidArgs" not in str(e):
1434 raise Exception("Unexpected error message for invalid AddNetwork: " + str(e))
1435
0e126c6d
JM
1436def test_dbus_network_oom(dev, apdev):
1437 """D-Bus AddNetwork/RemoveNetwork parameters and OOM error cases"""
1438 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1439 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1440
1441 args = dbus.Dictionary({ 'ssid': "foo1", 'key_mgmt': 'NONE',
1442 'identity': "testuser", 'scan_freq': '2412' },
1443 signature='sv')
1444 netw1 = iface.AddNetwork(args)
1445 net_obj = bus.get_object(WPAS_DBUS_SERVICE, netw1)
1446
1447 with alloc_fail_dbus(dev[0], 1,
1448 "wpa_config_get_all;wpas_dbus_getter_network_properties",
1449 "Get"):
1450 net_obj.Get(WPAS_DBUS_NETWORK, "Properties",
1451 dbus_interface=dbus.PROPERTIES_IFACE)
1452
1453 iface.RemoveAllNetworks()
1454
1455 with alloc_fail_dbus(dev[0], 1,
1456 "wpas_dbus_new_decompose_object_path;wpas_dbus_handler_remove_network",
1457 "RemoveNetwork", "InvalidArgs"):
1458 iface.RemoveNetwork(dbus.ObjectPath("/fi/w1/wpa_supplicant1/Interfaces/1234/Networks/1234"))
1459
1460 with alloc_fail(dev[0], 1, "wpa_dbus_register_object_per_iface;wpas_dbus_register_network"):
1461 args = dbus.Dictionary({ 'ssid': "foo2", 'key_mgmt': 'NONE' },
1462 signature='sv')
1463 try:
1464 netw = iface.AddNetwork(args)
1465 # Currently, AddNetwork() succeeds even if os_strdup() for path
1466 # fails, so remove the network if that occurs.
1467 iface.RemoveNetwork(netw)
1468 except dbus.exceptions.DBusException, e:
1469 pass
1470
1471 for i in range(1, 3):
1472 with alloc_fail(dev[0], i, "=wpas_dbus_register_network"):
1473 try:
1474 netw = iface.AddNetwork(args)
1475 # Currently, AddNetwork() succeeds even if network registration
1476 # fails, so remove the network if that occurs.
1477 iface.RemoveNetwork(netw)
1478 except dbus.exceptions.DBusException, e:
1479 pass
1480
1481 with alloc_fail_dbus(dev[0], 1,
1482 "=wpa_config_add_network;wpas_dbus_handler_add_network",
1483 "AddNetwork",
1484 "UnknownError: wpa_supplicant could not add a network"):
1485 args = dbus.Dictionary({ 'ssid': "foo2", 'key_mgmt': 'NONE' },
1486 signature='sv')
1487 netw = iface.AddNetwork(args)
1488
1489 tests = [ (1,
1490 'wpa_dbus_dict_get_entry;set_network_properties;wpas_dbus_handler_add_network',
1491 dbus.Dictionary({ 'ssid': dbus.ByteArray(' ') },
1492 signature='sv')),
1493 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1494 dbus.Dictionary({ 'ssid': 'foo' }, signature='sv')),
1495 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1496 dbus.Dictionary({ 'eap': 'foo' }, signature='sv')),
1497 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1498 dbus.Dictionary({ 'priority': dbus.UInt32(1) },
1499 signature='sv')),
1500 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1501 dbus.Dictionary({ 'priority': dbus.Int32(1) },
1502 signature='sv')),
1503 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1504 dbus.Dictionary({ 'ssid': dbus.ByteArray(' ') },
1505 signature='sv')) ]
1506 for (count,funcs,args) in tests:
1507 with alloc_fail_dbus(dev[0], count, funcs, "AddNetwork", "InvalidArgs"):
1508 netw = iface.AddNetwork(args)
1509
1510 if len(if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
1511 dbus_interface=dbus.PROPERTIES_IFACE)) > 0:
1512 raise Exception("Unexpected network block added")
1513 if len(dev[0].list_networks()) > 0:
1514 raise Exception("Unexpected network block visible")
1515
2ec82e67
JM
1516def test_dbus_interface(dev, apdev):
1517 """D-Bus CreateInterface/GetInterface/RemoveInterface parameters and error cases"""
910eb5b5 1518 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1519 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
1520
1521 params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none' },
1522 signature='sv')
1523 path = wpas.CreateInterface(params)
1524 logger.debug("New interface path: " + str(path))
1525 path2 = wpas.GetInterface("lo")
1526 if path != path2:
1527 raise Exception("Interface object mismatch")
1528
1529 params = dbus.Dictionary({ 'Ifname': 'lo',
1530 'Driver': 'none',
1531 'ConfigFile': 'foo',
1532 'BridgeIfname': 'foo', },
1533 signature='sv')
1534 try:
1535 wpas.CreateInterface(params)
1536 raise Exception("Invalid CreateInterface() accepted")
1537 except dbus.exceptions.DBusException, e:
1538 if "InterfaceExists" not in str(e):
1539 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e))
1540
1541 wpas.RemoveInterface(path)
1542 try:
1543 wpas.RemoveInterface(path)
1544 raise Exception("Invalid RemoveInterface() accepted")
1545 except dbus.exceptions.DBusException, e:
1546 if "InterfaceUnknown" not in str(e):
1547 raise Exception("Unexpected error message for invalid RemoveInterface: " + str(e))
1548
1549 params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none',
1550 'Foo': 123 },
1551 signature='sv')
1552 try:
1553 wpas.CreateInterface(params)
1554 raise Exception("Invalid CreateInterface() accepted")
1555 except dbus.exceptions.DBusException, e:
1556 if "InvalidArgs" not in str(e):
1557 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e))
1558
1559 params = dbus.Dictionary({ 'Driver': 'none' }, signature='sv')
1560 try:
1561 wpas.CreateInterface(params)
1562 raise Exception("Invalid CreateInterface() accepted")
1563 except dbus.exceptions.DBusException, e:
1564 if "InvalidArgs" not in str(e):
1565 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e))
1566
1567 try:
1568 wpas.GetInterface("lo")
1569 raise Exception("Invalid GetInterface() accepted")
1570 except dbus.exceptions.DBusException, e:
1571 if "InterfaceUnknown" not in str(e):
1572 raise Exception("Unexpected error message for invalid RemoveInterface: " + str(e))
1573
0e126c6d
JM
1574def test_dbus_interface_oom(dev, apdev):
1575 """D-Bus CreateInterface/GetInterface/RemoveInterface OOM error cases"""
1576 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1577 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
1578
1579 with alloc_fail_dbus(dev[0], 1, "wpa_dbus_dict_get_entry;wpas_dbus_handler_create_interface", "CreateInterface", "InvalidArgs"):
1580 params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none' },
1581 signature='sv')
1582 wpas.CreateInterface(params)
1583
1584 for i in range(1, 1000):
1585 dev[0].request("TEST_ALLOC_FAIL %d:wpa_supplicant_add_iface;wpas_dbus_handler_create_interface" % i)
1586 params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none' },
1587 signature='sv')
1588 try:
1589 npath = wpas.CreateInterface(params)
1590 wpas.RemoveInterface(npath)
1591 logger.info("CreateInterface succeeds after %d allocation failures" % i)
1592 state = dev[0].request('GET_ALLOC_FAIL')
1593 logger.info("GET_ALLOC_FAIL: " + state)
1594 dev[0].dump_monitor()
1595 dev[0].request("TEST_ALLOC_FAIL 0:")
1596 if i < 5:
1597 raise Exception("CreateInterface succeeded during out-of-memory")
1598 if not state.startswith('0:'):
1599 break
1600 except dbus.exceptions.DBusException, e:
1601 pass
1602
1603 for arg in [ 'Driver', 'Ifname', 'ConfigFile', 'BridgeIfname' ]:
1604 with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_handler_create_interface",
1605 "CreateInterface"):
1606 params = dbus.Dictionary({ arg: 'foo' }, signature='sv')
1607 wpas.CreateInterface(params)
1608
2ec82e67
JM
1609def test_dbus_blob(dev, apdev):
1610 """D-Bus AddNetwork/RemoveNetwork parameters and error cases"""
910eb5b5 1611 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1612 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1613
1614 blob = dbus.ByteArray("\x01\x02\x03")
1615 iface.AddBlob('blob1', blob)
1616 try:
1617 iface.AddBlob('blob1', dbus.ByteArray("\x01\x02\x04"))
1618 raise Exception("Invalid AddBlob() accepted")
1619 except dbus.exceptions.DBusException, e:
1620 if "BlobExists" not in str(e):
1621 raise Exception("Unexpected error message for invalid AddBlob: " + str(e))
1622 res = iface.GetBlob('blob1')
1623 if len(res) != len(blob):
1624 raise Exception("Unexpected blob data length")
1625 for i in range(len(res)):
1626 if res[i] != dbus.Byte(blob[i]):
1627 raise Exception("Unexpected blob data")
1628 res = if_obj.Get(WPAS_DBUS_IFACE, "Blobs",
1629 dbus_interface=dbus.PROPERTIES_IFACE)
1630 if 'blob1' not in res:
1631 raise Exception("Added blob missing from Blobs property")
1632 iface.RemoveBlob('blob1')
1633 try:
1634 iface.RemoveBlob('blob1')
1635 raise Exception("Invalid RemoveBlob() accepted")
1636 except dbus.exceptions.DBusException, e:
1637 if "BlobUnknown" not in str(e):
1638 raise Exception("Unexpected error message for invalid RemoveBlob: " + str(e))
1639 try:
1640 iface.GetBlob('blob1')
1641 raise Exception("Invalid GetBlob() accepted")
1642 except dbus.exceptions.DBusException, e:
1643 if "BlobUnknown" not in str(e):
1644 raise Exception("Unexpected error message for invalid GetBlob: " + str(e))
1645
1646 class TestDbusBlob(TestDbus):
1647 def __init__(self, bus):
1648 TestDbus.__init__(self, bus)
1649 self.blob_added = False
1650 self.blob_removed = False
1651
1652 def __enter__(self):
1653 gobject.timeout_add(1, self.run_blob)
1654 gobject.timeout_add(15000, self.timeout)
1655 self.add_signal(self.blobAdded, WPAS_DBUS_IFACE, "BlobAdded")
1656 self.add_signal(self.blobRemoved, WPAS_DBUS_IFACE, "BlobRemoved")
1657 self.loop.run()
1658 return self
1659
1660 def blobAdded(self, blobName):
1661 logger.debug("blobAdded: %s" % blobName)
1662 if blobName == 'blob2':
1663 self.blob_added = True
1664
1665 def blobRemoved(self, blobName):
1666 logger.debug("blobRemoved: %s" % blobName)
1667 if blobName == 'blob2':
1668 self.blob_removed = True
1669 self.loop.quit()
1670
1671 def run_blob(self, *args):
1672 logger.debug("run_blob")
1673 iface.AddBlob('blob2', dbus.ByteArray("\x01\x02\x04"))
1674 iface.RemoveBlob('blob2')
1675 return False
1676
1677 def success(self):
1678 return self.blob_added and self.blob_removed
1679
1680 with TestDbusBlob(bus) as t:
1681 if not t.success():
1682 raise Exception("Expected signals not seen")
1683
0e126c6d
JM
1684def test_dbus_blob_oom(dev, apdev):
1685 """D-Bus AddNetwork/RemoveNetwork OOM error cases"""
1686 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1687 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1688
1689 for i in range(1, 4):
1690 with alloc_fail_dbus(dev[0], i, "wpas_dbus_handler_add_blob",
1691 "AddBlob"):
1692 iface.AddBlob('blob_no_mem', dbus.ByteArray("\x01\x02\x03\x04"))
1693
2ec82e67
JM
1694def test_dbus_autoscan(dev, apdev):
1695 """D-Bus Autoscan()"""
910eb5b5 1696 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1697 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1698
1699 iface.AutoScan("foo")
1700 iface.AutoScan("periodic:1")
1701 iface.AutoScan("")
1702 dev[0].request("AUTOSCAN ")
1703
0e126c6d
JM
1704def test_dbus_autoscan_oom(dev, apdev):
1705 """D-Bus Autoscan() OOM"""
1706 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1707 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1708
1709 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_handler_autoscan", "AutoScan"):
1710 iface.AutoScan("foo")
1711 dev[0].request("AUTOSCAN ")
1712
2ec82e67
JM
1713def test_dbus_tdls_invalid(dev, apdev):
1714 """D-Bus invalid TDLS operations"""
910eb5b5 1715 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1716 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1717
1718 hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "test-open" })
1719 connect_2sta_open(dev, hapd)
1720 addr1 = dev[1].p2p_interface_addr()
1721
1722 try:
1723 iface.TDLSDiscover("foo")
1724 raise Exception("Invalid TDLSDiscover() accepted")
1725 except dbus.exceptions.DBusException, e:
1726 if "InvalidArgs" not in str(e):
1727 raise Exception("Unexpected error message for invalid TDLSDiscover: " + str(e))
1728
1729 try:
1730 iface.TDLSStatus("foo")
1731 raise Exception("Invalid TDLSStatus() accepted")
1732 except dbus.exceptions.DBusException, e:
1733 if "InvalidArgs" not in str(e):
1734 raise Exception("Unexpected error message for invalid TDLSStatus: " + str(e))
1735
1736 res = iface.TDLSStatus(addr1)
1737 if res != "peer does not exist":
1738 raise Exception("Unexpected TDLSStatus response")
1739
1740 try:
1741 iface.TDLSSetup("foo")
1742 raise Exception("Invalid TDLSSetup() accepted")
1743 except dbus.exceptions.DBusException, e:
1744 if "InvalidArgs" not in str(e):
1745 raise Exception("Unexpected error message for invalid TDLSSetup: " + str(e))
1746
1747 try:
1748 iface.TDLSTeardown("foo")
1749 raise Exception("Invalid TDLSTeardown() accepted")
1750 except dbus.exceptions.DBusException, e:
1751 if "InvalidArgs" not in str(e):
1752 raise Exception("Unexpected error message for invalid TDLSTeardown: " + str(e))
1753
795b6f57
JM
1754 try:
1755 iface.TDLSTeardown("00:11:22:33:44:55")
1756 raise Exception("TDLSTeardown accepted for unknown peer")
1757 except dbus.exceptions.DBusException, e:
1758 if "UnknownError: error performing TDLS teardown" not in str(e):
1759 raise Exception("Unexpected error message: " + str(e))
1760
0e126c6d
JM
1761def test_dbus_tdls_oom(dev, apdev):
1762 """D-Bus TDLS operations during OOM"""
1763 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1764 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1765
1766 with alloc_fail_dbus(dev[0], 1, "wpa_tdls_add_peer", "TDLSSetup",
1767 "UnknownError: error performing TDLS setup"):
1768 iface.TDLSSetup("00:11:22:33:44:55")
1769
2ec82e67
JM
1770def test_dbus_tdls(dev, apdev):
1771 """D-Bus TDLS"""
910eb5b5 1772 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1773 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1774
1775 hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "test-open" })
1776 connect_2sta_open(dev, hapd)
1777
1778 addr1 = dev[1].p2p_interface_addr()
1779
1780 class TestDbusTdls(TestDbus):
1781 def __init__(self, bus):
1782 TestDbus.__init__(self, bus)
1783 self.tdls_setup = False
1784 self.tdls_teardown = False
1785
1786 def __enter__(self):
1787 gobject.timeout_add(1, self.run_tdls)
1788 gobject.timeout_add(15000, self.timeout)
1789 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1790 "PropertiesChanged")
1791 self.loop.run()
1792 return self
1793
1794 def propertiesChanged(self, properties):
1795 logger.debug("propertiesChanged: %s" % str(properties))
1796
1797 def run_tdls(self, *args):
1798 logger.debug("run_tdls")
1799 iface.TDLSDiscover(addr1)
1800 gobject.timeout_add(100, self.run_tdls2)
1801 return False
1802
1803 def run_tdls2(self, *args):
1804 logger.debug("run_tdls2")
1805 iface.TDLSSetup(addr1)
1806 gobject.timeout_add(500, self.run_tdls3)
1807 return False
1808
1809 def run_tdls3(self, *args):
1810 logger.debug("run_tdls3")
1811 res = iface.TDLSStatus(addr1)
1812 if res == "connected":
1813 self.tdls_setup = True
1814 else:
1815 logger.info("Unexpected TDLSStatus: " + res)
1816 iface.TDLSTeardown(addr1)
1817 gobject.timeout_add(200, self.run_tdls4)
1818 return False
1819
1820 def run_tdls4(self, *args):
1821 logger.debug("run_tdls4")
1822 res = iface.TDLSStatus(addr1)
1823 if res == "peer does not exist":
1824 self.tdls_teardown = True
1825 else:
1826 logger.info("Unexpected TDLSStatus: " + res)
1827 self.loop.quit()
1828 return False
1829
1830 def success(self):
1831 return self.tdls_setup and self.tdls_teardown
1832
1833 with TestDbusTdls(bus) as t:
1834 if not t.success():
1835 raise Exception("Expected signals not seen")
1836
1837def test_dbus_pkcs11(dev, apdev):
1838 """D-Bus SetPKCS11EngineAndModulePath()"""
910eb5b5 1839 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1840 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1841
1842 try:
1843 iface.SetPKCS11EngineAndModulePath("foo", "bar")
1844 except dbus.exceptions.DBusException, e:
1845 if "Error.Failed: Reinit of the EAPOL" not in str(e):
1846 raise Exception("Unexpected error message for invalid SetPKCS11EngineAndModulePath: " + str(e))
1847
1848 try:
1849 iface.SetPKCS11EngineAndModulePath("foo", "")
1850 except dbus.exceptions.DBusException, e:
1851 if "Error.Failed: Reinit of the EAPOL" not in str(e):
1852 raise Exception("Unexpected error message for invalid SetPKCS11EngineAndModulePath: " + str(e))
1853
1854 iface.SetPKCS11EngineAndModulePath("", "bar")
1855 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11EnginePath",
1856 dbus_interface=dbus.PROPERTIES_IFACE)
1857 if res != "":
1858 raise Exception("Unexpected PKCS11EnginePath value: " + res)
1859 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11ModulePath",
1860 dbus_interface=dbus.PROPERTIES_IFACE)
1861 if res != "bar":
1862 raise Exception("Unexpected PKCS11ModulePath value: " + res)
1863
1864 iface.SetPKCS11EngineAndModulePath("", "")
1865 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11EnginePath",
1866 dbus_interface=dbus.PROPERTIES_IFACE)
1867 if res != "":
1868 raise Exception("Unexpected PKCS11EnginePath value: " + res)
1869 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11ModulePath",
1870 dbus_interface=dbus.PROPERTIES_IFACE)
1871 if res != "":
1872 raise Exception("Unexpected PKCS11ModulePath value: " + res)
1873
1874def test_dbus_apscan(dev, apdev):
1875 """D-Bus Get/Set ApScan"""
1876 try:
81e787b7 1877 _test_dbus_apscan(dev, apdev)
2ec82e67
JM
1878 finally:
1879 dev[0].request("AP_SCAN 1")
1880
1881def _test_dbus_apscan(dev, apdev):
910eb5b5 1882 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1883
1884 res = if_obj.Get(WPAS_DBUS_IFACE, "ApScan",
1885 dbus_interface=dbus.PROPERTIES_IFACE)
1886 if res != 1:
1887 raise Exception("Unexpected initial ApScan value: %d" % res)
1888
1889 for i in range(3):
1890 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(i),
1891 dbus_interface=dbus.PROPERTIES_IFACE)
1892 res = if_obj.Get(WPAS_DBUS_IFACE, "ApScan",
1893 dbus_interface=dbus.PROPERTIES_IFACE)
1894 if res != i:
1895 raise Exception("Unexpected ApScan value %d (expected %d)" % (res, i))
1896
1897 try:
1898 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.Int16(-1),
1899 dbus_interface=dbus.PROPERTIES_IFACE)
1900 raise Exception("Invalid Set(ApScan,-1) accepted")
1901 except dbus.exceptions.DBusException, e:
1902 if "Error.Failed: wrong property type" not in str(e):
1903 raise Exception("Unexpected error message for invalid Set(ApScan,-1): " + str(e))
1904
1905 try:
1906 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(123),
1907 dbus_interface=dbus.PROPERTIES_IFACE)
1908 raise Exception("Invalid Set(ApScan,123) accepted")
1909 except dbus.exceptions.DBusException, e:
1910 if "Error.Failed: ap_scan must be 0, 1, or 2" not in str(e):
1911 raise Exception("Unexpected error message for invalid Set(ApScan,123): " + str(e))
1912
1913 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(1),
1914 dbus_interface=dbus.PROPERTIES_IFACE)
1915
1916def test_dbus_fastreauth(dev, apdev):
1917 """D-Bus Get/Set FastReauth"""
910eb5b5 1918 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1919
1920 res = if_obj.Get(WPAS_DBUS_IFACE, "FastReauth",
1921 dbus_interface=dbus.PROPERTIES_IFACE)
1922 if res != True:
1923 raise Exception("Unexpected initial FastReauth value: " + str(res))
1924
1925 for i in [ False, True ]:
1926 if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Boolean(i),
1927 dbus_interface=dbus.PROPERTIES_IFACE)
1928 res = if_obj.Get(WPAS_DBUS_IFACE, "FastReauth",
1929 dbus_interface=dbus.PROPERTIES_IFACE)
1930 if res != i:
1931 raise Exception("Unexpected FastReauth value %d (expected %d)" % (res, i))
1932
1933 try:
1934 if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Int16(-1),
1935 dbus_interface=dbus.PROPERTIES_IFACE)
1936 raise Exception("Invalid Set(FastReauth,-1) accepted")
1937 except dbus.exceptions.DBusException, e:
1938 if "Error.Failed: wrong property type" not in str(e):
1939 raise Exception("Unexpected error message for invalid Set(ApScan,-1): " + str(e))
1940
1941 if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Boolean(True),
1942 dbus_interface=dbus.PROPERTIES_IFACE)
1943
1944def test_dbus_bss_expire(dev, apdev):
1945 """D-Bus Get/Set BSSExpireAge and BSSExpireCount"""
910eb5b5 1946 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1947
1948 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(179),
1949 dbus_interface=dbus.PROPERTIES_IFACE)
1950 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSExpireAge",
1951 dbus_interface=dbus.PROPERTIES_IFACE)
1952 if res != 179:
1953 raise Exception("Unexpected BSSExpireAge value %d (expected %d)" % (res, i))
1954
1955 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(3),
1956 dbus_interface=dbus.PROPERTIES_IFACE)
1957 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSExpireCount",
1958 dbus_interface=dbus.PROPERTIES_IFACE)
1959 if res != 3:
1960 raise Exception("Unexpected BSSExpireCount value %d (expected %d)" % (res, i))
1961
1962 try:
1963 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.Int16(-1),
1964 dbus_interface=dbus.PROPERTIES_IFACE)
1965 raise Exception("Invalid Set(BSSExpireAge,-1) accepted")
1966 except dbus.exceptions.DBusException, e:
1967 if "Error.Failed: wrong property type" not in str(e):
1968 raise Exception("Unexpected error message for invalid Set(BSSExpireAge,-1): " + str(e))
1969
1970 try:
1971 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(9),
1972 dbus_interface=dbus.PROPERTIES_IFACE)
1973 raise Exception("Invalid Set(BSSExpireAge,9) accepted")
1974 except dbus.exceptions.DBusException, e:
1975 if "Error.Failed: BSSExpireAge must be >= 10" not in str(e):
1976 raise Exception("Unexpected error message for invalid Set(BSSExpireAge,9): " + str(e))
1977
1978 try:
1979 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.Int16(-1),
1980 dbus_interface=dbus.PROPERTIES_IFACE)
1981 raise Exception("Invalid Set(BSSExpireCount,-1) accepted")
1982 except dbus.exceptions.DBusException, e:
1983 if "Error.Failed: wrong property type" not in str(e):
1984 raise Exception("Unexpected error message for invalid Set(BSSExpireCount,-1): " + str(e))
1985
1986 try:
1987 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(0),
1988 dbus_interface=dbus.PROPERTIES_IFACE)
1989 raise Exception("Invalid Set(BSSExpireCount,0) accepted")
1990 except dbus.exceptions.DBusException, e:
1991 if "Error.Failed: BSSExpireCount must be > 0" not in str(e):
1992 raise Exception("Unexpected error message for invalid Set(BSSExpireCount,0): " + str(e))
1993
1994 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(180),
1995 dbus_interface=dbus.PROPERTIES_IFACE)
1996 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(2),
1997 dbus_interface=dbus.PROPERTIES_IFACE)
1998
1999def test_dbus_country(dev, apdev):
2000 """D-Bus Get/Set Country"""
2001 try:
81e787b7 2002 _test_dbus_country(dev, apdev)
2ec82e67
JM
2003 finally:
2004 dev[0].request("SET country 00")
2005 subprocess.call(['iw', 'reg', 'set', '00'])
2006
2007def _test_dbus_country(dev, apdev):
910eb5b5 2008 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2009
2010 # work around issues with possible pending regdom event from the end of
2011 # the previous test case
2012 time.sleep(0.2)
2013 dev[0].dump_monitor()
2014
2015 if_obj.Set(WPAS_DBUS_IFACE, "Country", "FI",
2016 dbus_interface=dbus.PROPERTIES_IFACE)
2017 res = if_obj.Get(WPAS_DBUS_IFACE, "Country",
2018 dbus_interface=dbus.PROPERTIES_IFACE)
2019 if res != "FI":
2020 raise Exception("Unexpected Country value %s (expected FI)" % res)
2021
2022 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"])
2023 if ev is None:
2024 raise Exception("regdom change event not seen")
2025 if "init=USER type=COUNTRY alpha2=FI" not in ev:
2026 raise Exception("Unexpected event contents: " + ev)
2027
2028 try:
2029 if_obj.Set(WPAS_DBUS_IFACE, "Country", dbus.Int16(-1),
2030 dbus_interface=dbus.PROPERTIES_IFACE)
2031 raise Exception("Invalid Set(Country,-1) accepted")
2032 except dbus.exceptions.DBusException, e:
2033 if "Error.Failed: wrong property type" not in str(e):
2034 raise Exception("Unexpected error message for invalid Set(Country,-1): " + str(e))
2035
2036 try:
2037 if_obj.Set(WPAS_DBUS_IFACE, "Country", "F",
2038 dbus_interface=dbus.PROPERTIES_IFACE)
2039 raise Exception("Invalid Set(Country,F) accepted")
2040 except dbus.exceptions.DBusException, e:
2041 if "Error.Failed: invalid country code" not in str(e):
2042 raise Exception("Unexpected error message for invalid Set(Country,F): " + str(e))
2043
2044 if_obj.Set(WPAS_DBUS_IFACE, "Country", "00",
2045 dbus_interface=dbus.PROPERTIES_IFACE)
2046
2047 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"])
2048 if ev is None:
2049 raise Exception("regdom change event not seen")
2050 if "init=CORE type=WORLD" not in ev:
2051 raise Exception("Unexpected event contents: " + ev)
2052
2053def test_dbus_scan_interval(dev, apdev):
2054 """D-Bus Get/Set ScanInterval"""
2055 try:
2056 _test_dbus_scan_interval(dev, apdev)
2057 finally:
2058 dev[0].request("SCAN_INTERVAL 5")
2059
2060def _test_dbus_scan_interval(dev, apdev):
910eb5b5 2061 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2062
2063 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(3),
2064 dbus_interface=dbus.PROPERTIES_IFACE)
2065 res = if_obj.Get(WPAS_DBUS_IFACE, "ScanInterval",
2066 dbus_interface=dbus.PROPERTIES_IFACE)
2067 if res != 3:
2068 raise Exception("Unexpected ScanInterval value %d (expected %d)" % (res, i))
2069
2070 try:
2071 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.UInt16(100),
2072 dbus_interface=dbus.PROPERTIES_IFACE)
2073 raise Exception("Invalid Set(ScanInterval,100) accepted")
2074 except dbus.exceptions.DBusException, e:
2075 if "Error.Failed: wrong property type" not in str(e):
2076 raise Exception("Unexpected error message for invalid Set(ScanInterval,100): " + str(e))
2077
2078 try:
2079 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(-1),
2080 dbus_interface=dbus.PROPERTIES_IFACE)
2081 raise Exception("Invalid Set(ScanInterval,-1) accepted")
2082 except dbus.exceptions.DBusException, e:
2083 if "Error.Failed: scan_interval must be >= 0" not in str(e):
2084 raise Exception("Unexpected error message for invalid Set(ScanInterval,-1): " + str(e))
2085
2086 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(5),
2087 dbus_interface=dbus.PROPERTIES_IFACE)
2088
2089def test_dbus_probe_req_reporting(dev, apdev):
2090 """D-Bus Probe Request reporting"""
910eb5b5 2091 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2092 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2093
2094 dev[0].p2p_start_go(freq=2412)
2095 dev[1].p2p_find(social=True)
2096
2097 class TestDbusProbe(TestDbus):
2098 def __init__(self, bus):
2099 TestDbus.__init__(self, bus)
2100 self.reported = False
2101
2102 def __enter__(self):
2103 gobject.timeout_add(1, self.run_test)
2104 gobject.timeout_add(15000, self.timeout)
2105 self.add_signal(self.probeRequest, WPAS_DBUS_IFACE, "ProbeRequest",
2106 byte_arrays=True)
2107 iface.SubscribeProbeReq()
2108 self.loop.run()
2109 return self
2110
2111 def probeRequest(self, args):
2112 logger.debug("probeRequest: args=%s" % str(args))
2113 self.reported = True
2114 self.loop.quit()
2115
2116 def run_test(self, *args):
2117 logger.debug("run_test")
2118 return False
2119
2120 def success(self):
2121 return self.reported
2122
2123 with TestDbusProbe(bus) as t:
2124 if not t.success():
2125 raise Exception("Expected signals not seen")
2126 iface.UnsubscribeProbeReq()
2127
2128 try:
2129 iface.UnsubscribeProbeReq()
2130 raise Exception("Invalid UnsubscribeProbeReq() accepted")
2131 except dbus.exceptions.DBusException, e:
2132 if "NoSubscription" not in str(e):
2133 raise Exception("Unexpected error message for invalid UnsubscribeProbeReq(): " + str(e))
2134
2135 with TestDbusProbe(bus) as t:
2136 if not t.success():
2137 raise Exception("Expected signals not seen")
2138 # On purpose, leave ProbeReq subscription in place to test automatic
2139 # cleanup.
2140
2141 dev[1].p2p_stop_find()
2142 dev[0].remove_group()
2143
0e126c6d
JM
2144def test_dbus_probe_req_reporting_oom(dev, apdev):
2145 """D-Bus Probe Request reporting (OOM)"""
2146 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2147 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2148
fb9adae4
JM
2149 # Need to make sure this process has not already subscribed to avoid false
2150 # failures due to the operation succeeding due to os_strdup() not even
2151 # getting called.
2152 try:
2153 iface.UnsubscribeProbeReq()
2154 was_subscribed = True
2155 except dbus.exceptions.DBusException, e:
2156 was_subscribed = False
2157 pass
2158
0e126c6d
JM
2159 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_handler_subscribe_preq",
2160 "SubscribeProbeReq"):
2161 iface.SubscribeProbeReq()
2162
fb9adae4
JM
2163 if was_subscribed:
2164 # On purpose, leave ProbeReq subscription in place to test automatic
2165 # cleanup.
2166 iface.SubscribeProbeReq()
2167
2ec82e67
JM
2168def test_dbus_p2p_invalid(dev, apdev):
2169 """D-Bus invalid P2P operations"""
910eb5b5 2170 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2171 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2172
2173 try:
2174 p2p.RejectPeer(path + "/Peers/00112233445566")
2175 raise Exception("Invalid RejectPeer accepted")
2176 except dbus.exceptions.DBusException, e:
2177 if "UnknownError: Failed to call wpas_p2p_reject" not in str(e):
2178 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e))
2179
2180 try:
2181 p2p.RejectPeer("/foo")
2182 raise Exception("Invalid RejectPeer accepted")
2183 except dbus.exceptions.DBusException, e:
2184 if "InvalidArgs" not in str(e):
2185 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e))
2186
2187 tests = [ {'DiscoveryType': 'foo'},
2188 {'RequestedDeviceTypes': 'foo'},
2189 {'RequestedDeviceTypes': ['foo']},
2190 {'RequestedDeviceTypes': ['1','2','3','4','5','6','7','8','9',
2191 '10','11','12','13','14','15','16',
2192 '17']},
2193 {'RequestedDeviceTypes': dbus.Array([], signature="s")},
2194 {'RequestedDeviceTypes': dbus.Array([['foo']], signature="as")},
2195 {'RequestedDeviceTypes': dbus.Array([], signature="i")},
2196 {'RequestedDeviceTypes': [dbus.ByteArray('12345678'),
2197 dbus.ByteArray('1234567')]},
2198 {'Foo': dbus.Int16(1)},
2199 {'Foo': dbus.UInt16(1)},
2200 {'Foo': dbus.Int64(1)},
2201 {'Foo': dbus.UInt64(1)},
2202 {'Foo': dbus.Double(1.23)},
2203 {'Foo': dbus.Signature('s')},
2204 {'Foo': 'bar'}]
2205 for t in tests:
2206 try:
2207 p2p.Find(dbus.Dictionary(t))
2208 raise Exception("Invalid Find accepted")
2209 except dbus.exceptions.DBusException, e:
2210 if "InvalidArgs" not in str(e):
2211 raise Exception("Unexpected error message for invalid Find(): " + str(e))
2212
2213 for p in [ "/foo",
2214 "/fi/w1/wpa_supplicant1/Interfaces/1234",
2215 "/fi/w1/wpa_supplicant1/Interfaces/1234/Networks/1234" ]:
2216 try:
2217 p2p.RemovePersistentGroup(dbus.ObjectPath(p))
2218 raise Exception("Invalid RemovePersistentGroup accepted")
2219 except dbus.exceptions.DBusException, e:
2220 if "InvalidArgs" not in str(e):
2221 raise Exception("Unexpected error message for invalid RemovePersistentGroup: " + str(e))
2222
2223 try:
2224 dev[0].request("P2P_SET disabled 1")
2225 p2p.Listen(5)
2226 raise Exception("Invalid Listen accepted")
2227 except dbus.exceptions.DBusException, e:
2228 if "UnknownError: Could not start P2P listen" not in str(e):
2229 raise Exception("Unexpected error message for invalid Listen: " + str(e))
2230 finally:
2231 dev[0].request("P2P_SET disabled 0")
2232
2233 test_obj = bus.get_object(WPAS_DBUS_SERVICE, path, introspect=False)
2234 test_p2p = dbus.Interface(test_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2235 try:
2236 test_p2p.Listen("foo")
2237 raise Exception("Invalid Listen accepted")
2238 except dbus.exceptions.DBusException, e:
2239 if "InvalidArgs" not in str(e):
2240 raise Exception("Unexpected error message for invalid Listen: " + str(e))
2241
2242 try:
2243 dev[0].request("P2P_SET disabled 1")
2244 p2p.ExtendedListen(dbus.Dictionary({}))
2245 raise Exception("Invalid ExtendedListen accepted")
2246 except dbus.exceptions.DBusException, e:
2247 if "UnknownError: failed to initiate a p2p_ext_listen" not in str(e):
2248 raise Exception("Unexpected error message for invalid ExtendedListen: " + str(e))
2249 finally:
2250 dev[0].request("P2P_SET disabled 0")
2251
2252 try:
2253 dev[0].request("P2P_SET disabled 1")
2254 args = { 'duration1': 30000, 'interval1': 102400,
2255 'duration2': 20000, 'interval2': 102400 }
2256 p2p.PresenceRequest(args)
2257 raise Exception("Invalid PresenceRequest accepted")
2258 except dbus.exceptions.DBusException, e:
2259 if "UnknownError: Failed to invoke presence request" not in str(e):
2260 raise Exception("Unexpected error message for invalid PresenceRequest: " + str(e))
2261 finally:
2262 dev[0].request("P2P_SET disabled 0")
2263
2264 try:
2265 params = dbus.Dictionary({'frequency': dbus.Int32(-1)})
2266 p2p.GroupAdd(params)
2267 raise Exception("Invalid GroupAdd accepted")
2268 except dbus.exceptions.DBusException, e:
2269 if "InvalidArgs" not in str(e):
2270 raise Exception("Unexpected error message for invalid GroupAdd: " + str(e))
2271
2272 try:
2273 params = dbus.Dictionary({'persistent_group_object':
2274 dbus.ObjectPath(path),
2275 'frequency': 2412})
2276 p2p.GroupAdd(params)
2277 raise Exception("Invalid GroupAdd accepted")
2278 except dbus.exceptions.DBusException, e:
2279 if "InvalidArgs" not in str(e):
2280 raise Exception("Unexpected error message for invalid GroupAdd: " + str(e))
2281
2282 try:
2283 p2p.Disconnect()
2284 raise Exception("Invalid Disconnect accepted")
2285 except dbus.exceptions.DBusException, e:
2286 if "UnknownError: failed to disconnect" not in str(e):
2287 raise Exception("Unexpected error message for invalid Disconnect: " + str(e))
2288
2289 try:
2290 dev[0].request("P2P_SET disabled 1")
2291 p2p.Flush()
2292 raise Exception("Invalid Flush accepted")
2293 except dbus.exceptions.DBusException, e:
2294 if "Error.Failed: P2P is not available for this interface" not in str(e):
2295 raise Exception("Unexpected error message for invalid Flush: " + str(e))
2296 finally:
2297 dev[0].request("P2P_SET disabled 0")
2298
2299 try:
2300 dev[0].request("P2P_SET disabled 1")
2301 args = { 'peer': path,
2302 'join': True,
2303 'wps_method': 'pbc',
2304 'frequency': 2412 }
2305 pin = p2p.Connect(args)
2306 raise Exception("Invalid Connect accepted")
2307 except dbus.exceptions.DBusException, e:
2308 if "Error.Failed: P2P is not available for this interface" not in str(e):
2309 raise Exception("Unexpected error message for invalid Connect: " + str(e))
2310 finally:
2311 dev[0].request("P2P_SET disabled 0")
2312
2313 tests = [ { 'frequency': dbus.Int32(-1) },
2314 { 'wps_method': 'pbc' },
2315 { 'wps_method': 'foo' } ]
2316 for args in tests:
2317 try:
2318 pin = p2p.Connect(args)
2319 raise Exception("Invalid Connect accepted")
2320 except dbus.exceptions.DBusException, e:
2321 if "InvalidArgs" not in str(e):
2322 raise Exception("Unexpected error message for invalid Connect: " + str(e))
2323
2324 try:
2325 dev[0].request("P2P_SET disabled 1")
2326 args = { 'peer': path }
2327 pin = p2p.Invite(args)
2328 raise Exception("Invalid Invite accepted")
2329 except dbus.exceptions.DBusException, e:
2330 if "Error.Failed: P2P is not available for this interface" not in str(e):
2331 raise Exception("Unexpected error message for invalid Invite: " + str(e))
2332 finally:
2333 dev[0].request("P2P_SET disabled 0")
2334
2335 try:
2336 args = { 'foo': 'bar' }
2337 pin = p2p.Invite(args)
2338 raise Exception("Invalid Invite accepted")
2339 except dbus.exceptions.DBusException, e:
2340 if "InvalidArgs" not in str(e):
2341 raise Exception("Unexpected error message for invalid Connect: " + str(e))
2342
2343 tests = [ (path, 'display', "InvalidArgs"),
2344 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2345 'display',
2346 "UnknownError: Failed to send provision discovery request"),
2347 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2348 'keypad',
2349 "UnknownError: Failed to send provision discovery request"),
2350 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2351 'pbc',
2352 "UnknownError: Failed to send provision discovery request"),
2353 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2354 'pushbutton',
2355 "UnknownError: Failed to send provision discovery request"),
2356 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2357 'foo', "InvalidArgs") ]
2358 for (p,method,err) in tests:
2359 try:
2360 p2p.ProvisionDiscoveryRequest(p, method)
2361 raise Exception("Invalid ProvisionDiscoveryRequest accepted")
2362 except dbus.exceptions.DBusException, e:
2363 if err not in str(e):
2364 raise Exception("Unexpected error message for invalid ProvisionDiscoveryRequest: " + str(e))
2365
2366 try:
2367 dev[0].request("P2P_SET disabled 1")
2368 if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Peers",
2369 dbus_interface=dbus.PROPERTIES_IFACE)
2370 raise Exception("Invalid Get(Peers) accepted")
2371 except dbus.exceptions.DBusException, e:
2372 if "Error.Failed: P2P is not available for this interface" not in str(e):
2373 raise Exception("Unexpected error message for invalid Get(Peers): " + str(e))
2374 finally:
2375 dev[0].request("P2P_SET disabled 0")
2376
0e126c6d
JM
2377def test_dbus_p2p_oom(dev, apdev):
2378 """D-Bus P2P operations and OOM"""
2379 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2380 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2381
2382 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_string_array",
2383 "Find", "InvalidArgs"):
2384 p2p.Find(dbus.Dictionary({ 'Foo': [ 'bar' ] }))
2385
2386 with alloc_fail_dbus(dev[0], 2, "_wpa_dbus_dict_entry_get_string_array",
2387 "Find", "InvalidArgs"):
2388 p2p.Find(dbus.Dictionary({ 'Foo': [ 'bar' ] }))
2389
2390 with alloc_fail_dbus(dev[0], 10, "_wpa_dbus_dict_entry_get_string_array",
2391 "Find", "InvalidArgs"):
2392 p2p.Find(dbus.Dictionary({ 'Foo': [ '1','2','3','4','5','6','7','8','9' ] }))
2393
2394 with alloc_fail_dbus(dev[0], 1, ":=_wpa_dbus_dict_entry_get_binarray",
2395 "Find", "InvalidArgs"):
2396 p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123') ] }))
2397
2398 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_byte_array;_wpa_dbus_dict_entry_get_binarray",
2399 "Find", "InvalidArgs"):
2400 p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123') ] }))
2401
2402 with alloc_fail_dbus(dev[0], 2, "=_wpa_dbus_dict_entry_get_binarray",
2403 "Find", "InvalidArgs"):
2404 p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123'),
2405 dbus.ByteArray('123'),
2406 dbus.ByteArray('123'),
2407 dbus.ByteArray('123'),
2408 dbus.ByteArray('123'),
2409 dbus.ByteArray('123'),
2410 dbus.ByteArray('123'),
2411 dbus.ByteArray('123'),
2412 dbus.ByteArray('123'),
2413 dbus.ByteArray('123'),
2414 dbus.ByteArray('123') ] }))
2415
2416 with alloc_fail_dbus(dev[0], 1, "wpabuf_alloc_ext_data;_wpa_dbus_dict_entry_get_binarray",
2417 "Find", "InvalidArgs"):
2418 p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123') ] }))
2419
2420 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_fill_value_from_variant;wpas_dbus_handler_p2p_find",
2421 "Find", "InvalidArgs"):
2422 p2p.Find(dbus.Dictionary({ 'Foo': path }))
2423
2424 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_byte_array",
2425 "AddService", "InvalidArgs"):
2426 args = { 'service_type': 'bonjour',
2427 'response': dbus.ByteArray(500*'b') }
2428 p2p.AddService(args)
2429
2430 with alloc_fail_dbus(dev[0], 2, "_wpa_dbus_dict_entry_get_byte_array",
2431 "AddService", "InvalidArgs"):
2432 p2p.AddService(args)
2433
2ec82e67
JM
2434def test_dbus_p2p_discovery(dev, apdev):
2435 """D-Bus P2P discovery"""
910eb5b5 2436 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2437 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2438
2439 addr0 = dev[0].p2p_dev_addr()
2440
2441 dev[1].request("SET sec_device_type 1-0050F204-2")
2442 dev[1].request("VENDOR_ELEM_ADD 1 dd0c0050f2041049000411223344")
2443 dev[1].p2p_listen()
2444 addr1 = dev[1].p2p_dev_addr()
2445 a1 = binascii.unhexlify(addr1.replace(':',''))
2446
2447 wfd_devinfo = "00001c440028"
2448 dev[2].request("SET wifi_display 1")
2449 dev[2].request("WFD_SUBELEM_SET 0 0006" + wfd_devinfo)
2450 wfd = binascii.unhexlify('000006' + wfd_devinfo)
2451 dev[2].p2p_listen()
2452 addr2 = dev[2].p2p_dev_addr()
2453 a2 = binascii.unhexlify(addr2.replace(':',''))
2454
2455 res = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE,
2456 dbus_interface=dbus.PROPERTIES_IFACE)
2457 if 'Peers' not in res:
2458 raise Exception("GetAll result missing Peers")
2459 if len(res['Peers']) != 0:
2460 raise Exception("Unexpected peer(s) in the list")
2461
2462 args = {'DiscoveryType': 'social',
2463 'RequestedDeviceTypes': [dbus.ByteArray('12345678')],
2464 'Timeout': dbus.Int32(1) }
2465 p2p.Find(dbus.Dictionary(args))
2466 p2p.StopFind()
2467
2468 class TestDbusP2p(TestDbus):
2469 def __init__(self, bus):
2470 TestDbus.__init__(self, bus)
2471 self.found = False
2472 self.found2 = False
2473 self.lost = False
2474
2475 def __enter__(self):
2476 gobject.timeout_add(1, self.run_test)
2477 gobject.timeout_add(15000, self.timeout)
2478 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
2479 "DeviceFound")
2480 self.add_signal(self.deviceLost, WPAS_DBUS_IFACE_P2PDEVICE,
2481 "DeviceLost")
2482 self.add_signal(self.provisionDiscoveryResponseEnterPin,
2483 WPAS_DBUS_IFACE_P2PDEVICE,
2484 "ProvisionDiscoveryResponseEnterPin")
2485 self.loop.run()
2486 return self
2487
2488 def deviceFound(self, path):
2489 logger.debug("deviceFound: path=%s" % path)
2490 res = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Peers",
2491 dbus_interface=dbus.PROPERTIES_IFACE)
2492 if len(res) < 1:
2493 raise Exception("Unexpected number of peers")
2494 if path not in res:
2495 raise Exception("Mismatch in peer object path")
2496 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
2497 res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
2498 dbus_interface=dbus.PROPERTIES_IFACE,
2499 byte_arrays=True)
2500 logger.debug("peer properties: " + str(res))
2501
2502 if res['DeviceAddress'] == a1:
2503 if 'SecondaryDeviceTypes' not in res:
2504 raise Exception("Missing SecondaryDeviceTypes")
2505 sec = res['SecondaryDeviceTypes']
2506 if len(sec) < 1:
2507 raise Exception("Secondary device type missing")
2508 if "\x00\x01\x00\x50\xF2\x04\x00\x02" not in sec:
2509 raise Exception("Secondary device type mismatch")
2510
2511 if 'VendorExtension' not in res:
2512 raise Exception("Missing VendorExtension")
2513 vendor = res['VendorExtension']
2514 if len(vendor) < 1:
2515 raise Exception("Vendor extension missing")
2516 if "\x11\x22\x33\x44" not in vendor:
2517 raise Exception("Secondary device type mismatch")
2518
2519 self.found = True
2520 elif res['DeviceAddress'] == a2:
2521 if 'IEs' not in res:
2522 raise Exception("IEs missing")
2523 if res['IEs'] != wfd:
2524 raise Exception("IEs mismatch")
2525 self.found2 = True
2526 else:
2527 raise Exception("Unexpected peer device address")
2528
2529 if self.found and self.found2:
2530 p2p.StopFind()
2531 p2p.RejectPeer(path)
2532 p2p.ProvisionDiscoveryRequest(path, 'display')
2533
2534 def deviceLost(self, path):
2535 logger.debug("deviceLost: path=%s" % path)
2536 self.lost = True
2537 try:
2538 p2p.RejectPeer(path)
2539 raise Exception("Invalid RejectPeer accepted")
2540 except dbus.exceptions.DBusException, e:
2541 if "UnknownError: Failed to call wpas_p2p_reject" not in str(e):
2542 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e))
2543 self.loop.quit()
2544
2545 def provisionDiscoveryResponseEnterPin(self, peer_object):
2546 logger.debug("provisionDiscoveryResponseEnterPin - peer=%s" % peer_object)
2547 p2p.Flush()
2548
2549 def run_test(self, *args):
2550 logger.debug("run_test")
2551 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social',
2552 'Timeout': dbus.Int32(10)}))
2553 return False
2554
2555 def success(self):
2556 return self.found and self.lost and self.found2
2557
2558 with TestDbusP2p(bus) as t:
2559 if not t.success():
2560 raise Exception("Expected signals not seen")
2561
2562 dev[1].request("VENDOR_ELEM_REMOVE 1 *")
2563 dev[1].p2p_stop_find()
2564
2565 p2p.Listen(1)
2566 dev[2].p2p_stop_find()
2567 dev[2].request("P2P_FLUSH")
2568 if not dev[2].discover_peer(addr0):
2569 raise Exception("Peer not found")
2570 p2p.StopFind()
2571 dev[2].p2p_stop_find()
2572
2573 try:
2574 p2p.ExtendedListen(dbus.Dictionary({'foo': 100}))
2575 raise Exception("Invalid ExtendedListen accepted")
2576 except dbus.exceptions.DBusException, e:
2577 if "InvalidArgs" not in str(e):
2578 raise Exception("Unexpected error message for invalid ExtendedListen(): " + str(e))
2579
2580 p2p.ExtendedListen(dbus.Dictionary({'period': 100, 'interval': 1000}))
2581 p2p.ExtendedListen(dbus.Dictionary({}))
2582 dev[0].global_request("P2P_EXT_LISTEN")
2583
2584def test_dbus_p2p_service_discovery(dev, apdev):
2585 """D-Bus P2P service discovery"""
910eb5b5 2586 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2587 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2588
2589 addr0 = dev[0].p2p_dev_addr()
2590 addr1 = dev[1].p2p_dev_addr()
2591
2592 bonjour_query = dbus.ByteArray(binascii.unhexlify('0b5f6166706f766572746370c00c000c01'))
2593 bonjour_response = dbus.ByteArray(binascii.unhexlify('074578616d706c65c027'))
2594
2595 args = { 'service_type': 'bonjour',
2596 'query': bonjour_query,
2597 'response': bonjour_response }
2598 p2p.AddService(args)
2599 p2p.FlushService()
2600 p2p.AddService(args)
2601
2602 try:
2603 p2p.DeleteService(args)
2604 raise Exception("Invalid DeleteService() accepted")
2605 except dbus.exceptions.DBusException, e:
2606 if "InvalidArgs" not in str(e):
2607 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
2608
2609 args = { 'service_type': 'bonjour',
2610 'query': bonjour_query }
2611 p2p.DeleteService(args)
2612 try:
2613 p2p.DeleteService(args)
2614 raise Exception("Invalid DeleteService() accepted")
2615 except dbus.exceptions.DBusException, e:
2616 if "InvalidArgs" not in str(e):
2617 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
2618
2619 args = { 'service_type': 'upnp',
2620 'version': 0x10,
2621 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' }
2622 p2p.AddService(args)
2623 p2p.DeleteService(args)
2624 try:
2625 p2p.DeleteService(args)
2626 raise Exception("Invalid DeleteService() accepted")
2627 except dbus.exceptions.DBusException, e:
2628 if "InvalidArgs" not in str(e):
2629 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
2630
2631 tests = [ { 'service_type': 'foo' },
2632 { 'service_type': 'foo', 'query': bonjour_query },
2633 { 'service_type': 'upnp' },
2634 { 'service_type': 'upnp', 'version': 0x10 },
2635 { 'service_type': 'upnp',
2636 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
2637 { 'version': 0x10,
2638 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
2639 { 'service_type': 'upnp', 'foo': 'bar' },
2640 { 'service_type': 'bonjour' },
2641 { 'service_type': 'bonjour', 'query': 'foo' },
2642 { 'service_type': 'bonjour', 'foo': 'bar' } ]
2643 for args in tests:
2644 try:
2645 p2p.DeleteService(args)
2646 raise Exception("Invalid DeleteService() accepted")
2647 except dbus.exceptions.DBusException, e:
2648 if "InvalidArgs" not in str(e):
2649 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
2650
2651 tests = [ { 'service_type': 'foo' },
2652 { 'service_type': 'upnp' },
2653 { 'service_type': 'upnp', 'version': 0x10 },
2654 { 'service_type': 'upnp',
2655 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
2656 { 'version': 0x10,
2657 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
2658 { 'service_type': 'upnp', 'foo': 'bar' },
2659 { 'service_type': 'bonjour' },
2660 { 'service_type': 'bonjour', 'query': 'foo' },
2661 { 'service_type': 'bonjour', 'response': 'foo' },
2662 { 'service_type': 'bonjour', 'query': bonjour_query },
2663 { 'service_type': 'bonjour', 'response': bonjour_response },
2664 { 'service_type': 'bonjour', 'query': dbus.ByteArray(500*'a') },
2665 { 'service_type': 'bonjour', 'foo': 'bar' } ]
2666 for args in tests:
2667 try:
2668 p2p.AddService(args)
2669 raise Exception("Invalid AddService() accepted")
2670 except dbus.exceptions.DBusException, e:
2671 if "InvalidArgs" not in str(e):
2672 raise Exception("Unexpected error message for invalid AddService(): " + str(e))
2673
2674 args = { 'tlv': dbus.ByteArray("\x02\x00\x00\x01") }
2675 ref = p2p.ServiceDiscoveryRequest(args)
2676 p2p.ServiceDiscoveryCancelRequest(ref)
2677 try:
2678 p2p.ServiceDiscoveryCancelRequest(ref)
2679 raise Exception("Invalid ServiceDiscoveryCancelRequest() accepted")
2680 except dbus.exceptions.DBusException, e:
2681 if "InvalidArgs" not in str(e):
2682 raise Exception("Unexpected error message for invalid AddService(): " + str(e))
2683 try:
2684 p2p.ServiceDiscoveryCancelRequest(dbus.UInt64(0))
2685 raise Exception("Invalid ServiceDiscoveryCancelRequest() accepted")
2686 except dbus.exceptions.DBusException, e:
2687 if "InvalidArgs" not in str(e):
2688 raise Exception("Unexpected error message for invalid AddService(): " + str(e))
2689
2690 args = { 'service_type': 'upnp',
2691 'version': 0x10,
2692 'service': 'ssdp:foo' }
2693 ref = p2p.ServiceDiscoveryRequest(args)
2694 p2p.ServiceDiscoveryCancelRequest(ref)
2695
2696 tests = [ { 'service_type': 'foo' },
2697 { 'foo': 'bar' },
2698 { 'tlv': 'foo' },
2699 { },
2700 { 'version': 0 },
2701 { 'service_type': 'upnp',
2702 'service': 'ssdp:foo' },
2703 { 'service_type': 'upnp',
2704 'version': 0x10 },
2705 { 'service_type': 'upnp',
2706 'version': 0x10,
2707 'service': 'ssdp:foo',
2708 'peer_object': dbus.ObjectPath(path + "/Peers") },
2709 { 'service_type': 'upnp',
2710 'version': 0x10,
2711 'service': 'ssdp:foo',
2712 'peer_object': path + "/Peers" },
2713 { 'service_type': 'upnp',
2714 'version': 0x10,
2715 'service': 'ssdp:foo',
2716 'peer_object': dbus.ObjectPath(path + "/Peers/00112233445566") } ]
2717 for args in tests:
2718 try:
2719 p2p.ServiceDiscoveryRequest(args)
2720 raise Exception("Invalid ServiceDiscoveryRequest accepted")
2721 except dbus.exceptions.DBusException, e:
2722 if "InvalidArgs" not in str(e):
2723 raise Exception("Unexpected error message for invalid ServiceDiscoveryRequest(): " + str(e))
2724
2725 args = { 'foo': 'bar' }
2726 try:
2727 p2p.ServiceDiscoveryResponse(dbus.Dictionary(args, signature='sv'))
2728 raise Exception("Invalid ServiceDiscoveryResponse accepted")
2729 except dbus.exceptions.DBusException, e:
2730 if "InvalidArgs" not in str(e):
2731 raise Exception("Unexpected error message for invalid ServiceDiscoveryResponse(): " + str(e))
2732
2733def test_dbus_p2p_service_discovery_query(dev, apdev):
2734 """D-Bus P2P service discovery query"""
910eb5b5 2735 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2736 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2737
2738 addr0 = dev[0].p2p_dev_addr()
2739 dev[1].request("P2P_SERVICE_ADD bonjour 0b5f6166706f766572746370c00c000c01 074578616d706c65c027")
2740 dev[1].p2p_listen()
2741 addr1 = dev[1].p2p_dev_addr()
2742
2743 class TestDbusP2p(TestDbus):
2744 def __init__(self, bus):
2745 TestDbus.__init__(self, bus)
2746 self.done = False
2747
2748 def __enter__(self):
2749 gobject.timeout_add(1, self.run_test)
2750 gobject.timeout_add(15000, self.timeout)
2751 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
2752 "DeviceFound")
2753 self.add_signal(self.serviceDiscoveryResponse,
2754 WPAS_DBUS_IFACE_P2PDEVICE,
2755 "ServiceDiscoveryResponse", byte_arrays=True)
2756 self.loop.run()
2757 return self
2758
2759 def deviceFound(self, path):
2760 logger.debug("deviceFound: path=%s" % path)
2761 args = { 'peer_object': path,
2762 'tlv': dbus.ByteArray("\x02\x00\x00\x01") }
2763 p2p.ServiceDiscoveryRequest(args)
2764
2765 def serviceDiscoveryResponse(self, sd_request):
2766 logger.debug("serviceDiscoveryResponse: sd_request=%s" % str(sd_request))
2767 self.done = True
2768 self.loop.quit()
2769
2770 def run_test(self, *args):
2771 logger.debug("run_test")
2772 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social',
2773 'Timeout': dbus.Int32(10)}))
2774 return False
2775
2776 def success(self):
2777 return self.done
2778
2779 with TestDbusP2p(bus) as t:
2780 if not t.success():
2781 raise Exception("Expected signals not seen")
2782
2783 dev[1].p2p_stop_find()
2784
2785def test_dbus_p2p_service_discovery_external(dev, apdev):
2786 """D-Bus P2P service discovery with external response"""
2787 try:
2788 _test_dbus_p2p_service_discovery_external(dev, apdev)
2789 finally:
2790 dev[0].request("P2P_SERV_DISC_EXTERNAL 0")
2791
2792def _test_dbus_p2p_service_discovery_external(dev, apdev):
910eb5b5 2793 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2794 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2795
2796 addr0 = dev[0].p2p_dev_addr()
2797 addr1 = dev[1].p2p_dev_addr()
2798 resp = "0300000101"
2799
2800 dev[1].request("P2P_FLUSH")
2801 dev[1].request("P2P_SERV_DISC_REQ " + addr0 + " 02000001")
2802 dev[1].p2p_find(social=True)
2803
2804 class TestDbusP2p(TestDbus):
2805 def __init__(self, bus):
2806 TestDbus.__init__(self, bus)
2807 self.sd = False
2808
2809 def __enter__(self):
2810 gobject.timeout_add(1, self.run_test)
2811 gobject.timeout_add(15000, self.timeout)
2812 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
2813 "DeviceFound")
2814 self.add_signal(self.serviceDiscoveryRequest,
2815 WPAS_DBUS_IFACE_P2PDEVICE,
2816 "ServiceDiscoveryRequest")
2817 self.loop.run()
2818 return self
2819
2820 def deviceFound(self, path):
2821 logger.debug("deviceFound: path=%s" % path)
2822
2823 def serviceDiscoveryRequest(self, sd_request):
2824 logger.debug("serviceDiscoveryRequest: sd_request=%s" % str(sd_request))
2825 self.sd = True
2826 args = { 'peer_object': sd_request['peer_object'],
2827 'frequency': sd_request['frequency'],
2828 'dialog_token': sd_request['dialog_token'],
2829 'tlvs': dbus.ByteArray(binascii.unhexlify(resp)) }
2830 p2p.ServiceDiscoveryResponse(dbus.Dictionary(args, signature='sv'))
2831 self.loop.quit()
2832
2833 def run_test(self, *args):
2834 logger.debug("run_test")
2835 p2p.ServiceDiscoveryExternal(1)
2836 p2p.ServiceUpdate()
2837 p2p.Listen(15)
2838 return False
2839
2840 def success(self):
2841 return self.sd
2842
2843 with TestDbusP2p(bus) as t:
2844 if not t.success():
2845 raise Exception("Expected signals not seen")
2846
2847 ev = dev[1].wait_global_event(["P2P-SERV-DISC-RESP"], timeout=5)
2848 if ev is None:
2849 raise Exception("Service discovery timed out")
2850 if addr0 not in ev:
2851 raise Exception("Unexpected address in SD Response: " + ev)
2852 if ev.split(' ')[4] != resp:
2853 raise Exception("Unexpected response data SD Response: " + ev)
2854 dev[1].p2p_stop_find()
2855
2856 p2p.StopFind()
2857 p2p.ServiceDiscoveryExternal(0)
2858
2859def test_dbus_p2p_autogo(dev, apdev):
2860 """D-Bus P2P autonomous GO"""
910eb5b5 2861 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2862 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2863 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
2864
2865 addr0 = dev[0].p2p_dev_addr()
2866
2867 class TestDbusP2p(TestDbus):
2868 def __init__(self, bus):
2869 TestDbus.__init__(self, bus)
2870 self.first = True
2871 self.waiting_end = False
2872 self.done = False
2873
2874 def __enter__(self):
2875 gobject.timeout_add(1, self.run_test)
2876 gobject.timeout_add(15000, self.timeout)
2877 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
2878 "DeviceFound")
2879 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
2880 "GroupStarted")
2881 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
2882 "GroupFinished")
2883 self.add_signal(self.persistentGroupAdded,
2884 WPAS_DBUS_IFACE_P2PDEVICE,
2885 "PersistentGroupAdded")
2886 self.add_signal(self.persistentGroupRemoved,
2887 WPAS_DBUS_IFACE_P2PDEVICE,
2888 "PersistentGroupRemoved")
2889 self.add_signal(self.provisionDiscoveryRequestDisplayPin,
2890 WPAS_DBUS_IFACE_P2PDEVICE,
2891 "ProvisionDiscoveryRequestDisplayPin")
2892 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
2893 "StaAuthorized")
2894 self.loop.run()
2895 return self
2896
2897 def groupStarted(self, properties):
2898 logger.debug("groupStarted: " + str(properties))
2899 self.group = properties['group_object']
2900 role = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Role",
2901 dbus_interface=dbus.PROPERTIES_IFACE)
2902 if role != "GO":
2903 raise Exception("Unexpected role reported: " + role)
2904 group = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Group",
2905 dbus_interface=dbus.PROPERTIES_IFACE)
2906 if group != properties['group_object']:
2907 raise Exception("Unexpected Group reported: " + str(group))
2908 go = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PeerGO",
2909 dbus_interface=dbus.PROPERTIES_IFACE)
2910 if go != '/':
2911 raise Exception("Unexpected PeerGO value: " + str(go))
2912 if self.first:
2913 self.first = False
2914 logger.info("Remove persistent group instance")
2915 p2p.Disconnect()
2916 else:
2917 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
2918 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 join")
2919
2920 def groupFinished(self, properties):
2921 logger.debug("groupFinished: " + str(properties))
2922 if self.waiting_end:
2923 logger.info("Remove persistent group")
2924 p2p.RemovePersistentGroup(self.persistent)
2925 else:
2926 logger.info("Re-start persistent group")
2927 params = dbus.Dictionary({'persistent_group_object':
2928 self.persistent,
2929 'frequency': 2412})
2930 p2p.GroupAdd(params)
2931
2932 def persistentGroupAdded(self, path, properties):
2933 logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
2934 self.persistent = path
2935
2936 def persistentGroupRemoved(self, path):
2937 logger.debug("persistentGroupRemoved: %s" % path)
2938 self.done = True
2939 self.loop.quit()
2940
2941 def deviceFound(self, path):
2942 logger.debug("deviceFound: path=%s" % path)
2943 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
2944 self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
2945 dbus_interface=dbus.PROPERTIES_IFACE,
2946 byte_arrays=True)
2947 logger.debug('peer properties: ' + str(self.peer))
2948
2949 def provisionDiscoveryRequestDisplayPin(self, peer_object, pin):
2950 logger.debug("provisionDiscoveryRequestDisplayPin - peer=%s pin=%s" % (peer_object, pin))
2951 self.peer_path = peer_object
2952 peer = binascii.unhexlify(peer_object.split('/')[-1])
2953 addr = ""
2954 for p in peer:
2955 if len(addr) > 0:
2956 addr += ':'
2957 addr += '%02x' % ord(p)
795b6f57
JM
2958
2959 params = { 'Role': 'registrar',
2960 'P2PDeviceAddress': self.peer['DeviceAddress'],
2961 'Bssid': self.peer['DeviceAddress'],
2962 'Type': 'pin' }
2963 try:
2964 wps.Start(params)
2965 raise Exception("Invalid WPS.Start() accepted")
2966 except dbus.exceptions.DBusException, e:
2967 if "InvalidArgs" not in str(e):
2968 raise Exception("Unexpected error message: " + str(e))
2ec82e67
JM
2969 params = { 'Role': 'registrar',
2970 'P2PDeviceAddress': self.peer['DeviceAddress'],
2971 'Bssid': self.peer['DeviceAddress'],
2972 'Type': 'pin',
2973 'Pin': '12345670' }
2974 logger.info("Authorize peer to connect to the group")
2975 wps.Start(params)
2976
2977 def staAuthorized(self, name):
2978 logger.debug("staAuthorized: " + name)
2979 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, self.peer_path)
2980 res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
2981 dbus_interface=dbus.PROPERTIES_IFACE,
2982 byte_arrays=True)
2983 if 'Groups' not in res or len(res['Groups']) != 1:
2984 raise Exception("Unexpected number of peer Groups entries")
2985 if res['Groups'][0] != self.group:
2986 raise Exception("Unexpected peer Groups[0] value")
2987
2988 g_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group)
2989 res = g_obj.GetAll(WPAS_DBUS_GROUP,
2990 dbus_interface=dbus.PROPERTIES_IFACE,
2991 byte_arrays=True)
2992 logger.debug("Group properties: " + str(res))
2993 if 'Members' not in res or len(res['Members']) != 1:
2994 raise Exception("Unexpected number of group members")
2995
2996 ext = dbus.ByteArray("\x11\x22\x33\x44")
2997 # Earlier implementation of this interface was a bit strange. The
2998 # property is defined to have aay signature and that is what the
2999 # getter returned. However, the setter expected there to be a
3000 # dictionary with 'WPSVendorExtensions' as the key surrounding these
3001 # values.. The current implementations maintains support for that
3002 # for backwards compability reasons. Verify that encoding first.
3003 vals = dbus.Dictionary({ 'WPSVendorExtensions': [ ext ]},
3004 signature='sv')
3005 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3006 dbus_interface=dbus.PROPERTIES_IFACE)
3007 res = g_obj.Get(WPAS_DBUS_GROUP, 'WPSVendorExtensions',
3008 dbus_interface=dbus.PROPERTIES_IFACE,
3009 byte_arrays=True)
3010 if len(res) != 1:
3011 raise Exception("Unexpected number of vendor extensions")
3012 if res[0] != ext:
3013 raise Exception("Vendor extension value changed")
3014
3015 # And now verify that the more appropriate encoding is accepted as
3016 # well.
3017 res.append(dbus.ByteArray('\xaa\xbb\xcc\xdd\xee\xff'))
3018 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
3019 dbus_interface=dbus.PROPERTIES_IFACE)
3020 res2 = g_obj.Get(WPAS_DBUS_GROUP, 'WPSVendorExtensions',
3021 dbus_interface=dbus.PROPERTIES_IFACE,
3022 byte_arrays=True)
3023 if len(res) != 2:
3024 raise Exception("Unexpected number of vendor extensions")
3025 if res[0] != res2[0] or res[1] != res2[1]:
3026 raise Exception("Vendor extension value changed")
3027
3028 for i in range(10):
3029 res.append(dbus.ByteArray('\xaa\xbb'))
3030 try:
3031 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
3032 dbus_interface=dbus.PROPERTIES_IFACE)
3033 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3034 except dbus.exceptions.DBusException, e:
3035 if "Error.Failed" not in str(e):
3036 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3037
3038 vals = dbus.Dictionary({ 'Foo': [ ext ]}, signature='sv')
3039 try:
3040 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3041 dbus_interface=dbus.PROPERTIES_IFACE)
3042 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3043 except dbus.exceptions.DBusException, e:
3044 if "InvalidArgs" not in str(e):
3045 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3046
3047 vals = [ "foo" ]
3048 try:
3049 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3050 dbus_interface=dbus.PROPERTIES_IFACE)
3051 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3052 except dbus.exceptions.DBusException, e:
3053 if "Error.Failed" not in str(e):
3054 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3055
3056 vals = [ [ "foo" ] ]
3057 try:
3058 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3059 dbus_interface=dbus.PROPERTIES_IFACE)
3060 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3061 except dbus.exceptions.DBusException, e:
3062 if "Error.Failed" not in str(e):
3063 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3064
3065 self.waiting_end = True
3066 p2p.Disconnect()
3067
3068 def run_test(self, *args):
3069 logger.debug("run_test")
3070 params = dbus.Dictionary({'persistent': True,
3071 'frequency': 2412})
3072 logger.info("Add a persistent group")
3073 p2p.GroupAdd(params)
3074 return False
3075
3076 def success(self):
3077 return self.done
3078
3079 with TestDbusP2p(bus) as t:
3080 if not t.success():
3081 raise Exception("Expected signals not seen")
3082
3083 dev[1].wait_go_ending_session()
3084
3085def test_dbus_p2p_autogo_pbc(dev, apdev):
3086 """D-Bus P2P autonomous GO and PBC"""
910eb5b5 3087 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
3088 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3089 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
3090
3091 addr0 = dev[0].p2p_dev_addr()
3092
3093 class TestDbusP2p(TestDbus):
3094 def __init__(self, bus):
3095 TestDbus.__init__(self, bus)
3096 self.first = True
3097 self.waiting_end = False
3098 self.done = False
3099
3100 def __enter__(self):
3101 gobject.timeout_add(1, self.run_test)
3102 gobject.timeout_add(15000, self.timeout)
3103 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3104 "DeviceFound")
3105 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3106 "GroupStarted")
3107 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3108 "GroupFinished")
3109 self.add_signal(self.provisionDiscoveryPBCRequest,
3110 WPAS_DBUS_IFACE_P2PDEVICE,
3111 "ProvisionDiscoveryPBCRequest")
3112 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
3113 "StaAuthorized")
3114 self.loop.run()
3115 return self
3116
3117 def groupStarted(self, properties):
3118 logger.debug("groupStarted: " + str(properties))
3119 self.group = properties['group_object']
3120 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3121 dev1.global_request("P2P_CONNECT " + addr0 + " pbc join")
3122
3123 def groupFinished(self, properties):
3124 logger.debug("groupFinished: " + str(properties))
3125 self.done = True
3126 self.loop.quit()
3127
3128 def deviceFound(self, path):
3129 logger.debug("deviceFound: path=%s" % path)
3130 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3131 self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3132 dbus_interface=dbus.PROPERTIES_IFACE,
3133 byte_arrays=True)
3134 logger.debug('peer properties: ' + str(self.peer))
3135
3136 def provisionDiscoveryPBCRequest(self, peer_object):
3137 logger.debug("provisionDiscoveryPBCRequest - peer=%s" % peer_object)
3138 self.peer_path = peer_object
3139 peer = binascii.unhexlify(peer_object.split('/')[-1])
3140 addr = ""
3141 for p in peer:
3142 if len(addr) > 0:
3143 addr += ':'
3144 addr += '%02x' % ord(p)
3145 params = { 'Role': 'registrar',
3146 'P2PDeviceAddress': self.peer['DeviceAddress'],
3147 'Bssid': self.peer['DeviceAddress'],
3148 'Type': 'pbc' }
3149 logger.info("Authorize peer to connect to the group")
3150 wps.Start(params)
3151
3152 def staAuthorized(self, name):
3153 logger.debug("staAuthorized: " + name)
3154 p2p.Disconnect()
3155
3156 def run_test(self, *args):
3157 logger.debug("run_test")
3158 params = dbus.Dictionary({'frequency': 2412})
3159 p2p.GroupAdd(params)
3160 return False
3161
3162 def success(self):
3163 return self.done
3164
3165 with TestDbusP2p(bus) as t:
3166 if not t.success():
3167 raise Exception("Expected signals not seen")
3168
3169 dev[1].wait_go_ending_session()
3170 dev[1].flush_scan_cache()
3171
3172def test_dbus_p2p_autogo_legacy(dev, apdev):
3173 """D-Bus P2P autonomous GO and legacy STA"""
910eb5b5 3174 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
3175 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3176 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
3177
3178 addr0 = dev[0].p2p_dev_addr()
3179
3180 class TestDbusP2p(TestDbus):
3181 def __init__(self, bus):
3182 TestDbus.__init__(self, bus)
3183 self.done = False
3184
3185 def __enter__(self):
3186 gobject.timeout_add(1, self.run_test)
3187 gobject.timeout_add(15000, self.timeout)
3188 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3189 "GroupStarted")
3190 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3191 "GroupFinished")
3192 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
3193 "StaAuthorized")
3194 self.loop.run()
3195 return self
3196
3197 def groupStarted(self, properties):
3198 logger.debug("groupStarted: " + str(properties))
3199 pin = '12345670'
3200 params = { 'Role': 'enrollee',
3201 'Type': 'pin',
3202 'Pin': pin }
3203 wps.Start(params)
3204 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3205 dev1.scan_for_bss(addr0, freq=2412)
3206 dev1.request("WPS_PIN " + addr0 + " " + pin)
3207
3208 def groupFinished(self, properties):
3209 logger.debug("groupFinished: " + str(properties))
3210 self.done = True
3211 self.loop.quit()
3212
3213 def staAuthorized(self, name):
3214 logger.debug("staAuthorized: " + name)
3215 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3216 dev1.request("DISCONNECT")
3217 p2p.Disconnect()
3218
3219 def run_test(self, *args):
3220 logger.debug("run_test")
3221 params = dbus.Dictionary({'frequency': 2412})
3222 p2p.GroupAdd(params)
3223 return False
3224
3225 def success(self):
3226 return self.done
3227
3228 with TestDbusP2p(bus) as t:
3229 if not t.success():
3230 raise Exception("Expected signals not seen")
3231
3232def test_dbus_p2p_join(dev, apdev):
3233 """D-Bus P2P join an autonomous GO"""
910eb5b5 3234 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
3235 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3236
3237 addr1 = dev[1].p2p_dev_addr()
3238 addr2 = dev[2].p2p_dev_addr()
3239 dev[1].p2p_start_go(freq=2412)
3240 dev[2].p2p_listen()
3241
3242 class TestDbusP2p(TestDbus):
3243 def __init__(self, bus):
3244 TestDbus.__init__(self, bus)
3245 self.done = False
3246 self.peer = None
3247 self.go = None
3248
3249 def __enter__(self):
3250 gobject.timeout_add(1, self.run_test)
3251 gobject.timeout_add(15000, self.timeout)
3252 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3253 "DeviceFound")
3254 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3255 "GroupStarted")
3256 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3257 "GroupFinished")
3258 self.add_signal(self.invitationResult, WPAS_DBUS_IFACE_P2PDEVICE,
3259 "InvitationResult")
3260 self.loop.run()
3261 return self
3262
3263 def deviceFound(self, path):
3264 logger.debug("deviceFound: path=%s" % path)
3265 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3266 res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3267 dbus_interface=dbus.PROPERTIES_IFACE,
3268 byte_arrays=True)
3269 logger.debug('peer properties: ' + str(res))
3270 if addr2.replace(':','') in path:
3271 self.peer = path
3272 elif addr1.replace(':','') in path:
3273 self.go = path
3274 if self.peer and self.go:
3275 logger.info("Join the group")
3276 p2p.StopFind()
3277 args = { 'peer': self.go,
3278 'join': True,
3279 'wps_method': 'pin',
3280 'frequency': 2412 }
3281 pin = p2p.Connect(args)
3282
3283 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3284 dev1.request("WPS_PIN any " + pin)
3285
3286 def groupStarted(self, properties):
3287 logger.debug("groupStarted: " + str(properties))
3288 role = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Role",
3289 dbus_interface=dbus.PROPERTIES_IFACE)
3290 if role != "client":
3291 raise Exception("Unexpected role reported: " + role)
3292 group = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Group",
3293 dbus_interface=dbus.PROPERTIES_IFACE)
3294 if group != properties['group_object']:
3295 raise Exception("Unexpected Group reported: " + str(group))
3296 go = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PeerGO",
3297 dbus_interface=dbus.PROPERTIES_IFACE)
3298 if go != self.go:
3299 raise Exception("Unexpected PeerGO value: " + str(go))
3300
3301 g_obj = bus.get_object(WPAS_DBUS_SERVICE,
3302 properties['group_object'])
3303 res = g_obj.GetAll(WPAS_DBUS_GROUP,
3304 dbus_interface=dbus.PROPERTIES_IFACE,
3305 byte_arrays=True)
3306 logger.debug("Group properties: " + str(res))
3307
3308 ext = dbus.ByteArray("\x11\x22\x33\x44")
3309 try:
3310 # Set(WPSVendorExtensions) not allowed for P2P Client
3311 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
3312 dbus_interface=dbus.PROPERTIES_IFACE)
3313 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3314 except dbus.exceptions.DBusException, e:
3315 if "Error.Failed: Failed to set property" not in str(e):
3316 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3317
3318 args = { 'duration1': 30000, 'interval1': 102400,
3319 'duration2': 20000, 'interval2': 102400 }
3320 p2p.PresenceRequest(args)
3321
3322 args = { 'peer': self.peer }
3323 p2p.Invite(args)
3324
3325 def groupFinished(self, properties):
3326 logger.debug("groupFinished: " + str(properties))
3327 self.done = True
3328 self.loop.quit()
3329
3330 def invitationResult(self, result):
3331 logger.debug("invitationResult: " + str(result))
3332 if result['status'] != 1:
3333 raise Exception("Unexpected invitation result: " + str(result))
3334 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3335 dev1.remove_group()
3336
3337 def run_test(self, *args):
3338 logger.debug("run_test")
3339 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
3340 return False
3341
3342 def success(self):
3343 return self.done
3344
3345 with TestDbusP2p(bus) as t:
3346 if not t.success():
3347 raise Exception("Expected signals not seen")
3348
3349 dev[2].p2p_stop_find()
3350
3351def test_dbus_p2p_config(dev, apdev):
3352 """D-Bus Get/Set P2PDeviceConfig"""
3353 try:
3354 _test_dbus_p2p_config(dev, apdev)
3355 finally:
3356 dev[0].request("P2P_SET ssid_postfix ")
3357
3358def _test_dbus_p2p_config(dev, apdev):
910eb5b5 3359 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
3360 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3361
3362 res = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3363 dbus_interface=dbus.PROPERTIES_IFACE,
3364 byte_arrays=True)
3365 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig", res,
3366 dbus_interface=dbus.PROPERTIES_IFACE)
3367 res2 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3368 dbus_interface=dbus.PROPERTIES_IFACE,
3369 byte_arrays=True)
3370
3371 if len(res) != len(res2):
3372 raise Exception("Different number of parameters")
3373 for k in res:
3374 if res[k] != res2[k]:
3375 raise Exception("Parameter %s value changes" % k)
3376
3377 changes = { 'SsidPostfix': 'foo',
3378 'VendorExtension': [ dbus.ByteArray('\x11\x22\x33\x44') ],
3379 'SecondaryDeviceTypes': [ dbus.ByteArray('\x11\x22\x33\x44\x55\x66\x77\x88') ]}
3380 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3381 dbus.Dictionary(changes, signature='sv'),
3382 dbus_interface=dbus.PROPERTIES_IFACE)
3383
3384 res2 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3385 dbus_interface=dbus.PROPERTIES_IFACE,
3386 byte_arrays=True)
3387 logger.debug("P2PDeviceConfig: " + str(res2))
3388 if 'VendorExtension' not in res2 or len(res2['VendorExtension']) != 1:
3389 raise Exception("VendorExtension does not match")
3390 if 'SecondaryDeviceTypes' not in res2 or len(res2['SecondaryDeviceTypes']) != 1:
3391 raise Exception("SecondaryDeviceType does not match")
3392
3393 changes = { 'SsidPostfix': '',
3394 'VendorExtension': dbus.Array([], signature="ay"),
3395 'SecondaryDeviceTypes': dbus.Array([], signature="ay") }
3396 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3397 dbus.Dictionary(changes, signature='sv'),
3398 dbus_interface=dbus.PROPERTIES_IFACE)
3399
3400 res3 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3401 dbus_interface=dbus.PROPERTIES_IFACE,
3402 byte_arrays=True)
3403 logger.debug("P2PDeviceConfig: " + str(res3))
3404 if 'VendorExtension' in res3:
3405 raise Exception("VendorExtension not removed")
3406 if 'SecondaryDeviceTypes' in res3:
3407 raise Exception("SecondaryDeviceType not removed")
3408
3409 try:
3410 dev[0].request("P2P_SET disabled 1")
3411 if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3412 dbus_interface=dbus.PROPERTIES_IFACE,
3413 byte_arrays=True)
3414 raise Exception("Invalid Get(P2PDeviceConfig) accepted")
3415 except dbus.exceptions.DBusException, e:
3416 if "Error.Failed: P2P is not available for this interface" not in str(e):
3417 raise Exception("Unexpected error message for invalid Invite: " + str(e))
3418 finally:
3419 dev[0].request("P2P_SET disabled 0")
3420
3421 try:
3422 dev[0].request("P2P_SET disabled 1")
3423 changes = { 'SsidPostfix': 'foo' }
3424 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3425 dbus.Dictionary(changes, signature='sv'),
3426 dbus_interface=dbus.PROPERTIES_IFACE)
3427 raise Exception("Invalid Set(P2PDeviceConfig) accepted")
3428 except dbus.exceptions.DBusException, e:
3429 if "Error.Failed: P2P is not available for this interface" not in str(e):
3430 raise Exception("Unexpected error message for invalid Invite: " + str(e))
3431 finally:
3432 dev[0].request("P2P_SET disabled 0")
3433
3434 tests = [ { 'DeviceName': 123 },
3435 { 'SsidPostfix': 123 },
3436 { 'Foo': 'Bar' } ]
3437 for changes in tests:
3438 try:
3439 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3440 dbus.Dictionary(changes, signature='sv'),
3441 dbus_interface=dbus.PROPERTIES_IFACE)
3442 raise Exception("Invalid Set(P2PDeviceConfig) accepted")
3443 except dbus.exceptions.DBusException, e:
3444 if "InvalidArgs" not in str(e):
3445 raise Exception("Unexpected error message for invalid Invite: " + str(e))
3446
3447def test_dbus_p2p_persistent(dev, apdev):
3448 """D-Bus P2P persistent group"""
910eb5b5 3449 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
3450 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3451
3452 class TestDbusP2p(TestDbus):
3453 def __init__(self, bus):
3454 TestDbus.__init__(self, bus)
3455
3456 def __enter__(self):
3457 gobject.timeout_add(1, self.run_test)
3458 gobject.timeout_add(15000, self.timeout)
3459 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3460 "GroupStarted")
3461 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3462 "GroupFinished")
3463 self.add_signal(self.persistentGroupAdded,
3464 WPAS_DBUS_IFACE_P2PDEVICE,
3465 "PersistentGroupAdded")
3466 self.loop.run()
3467 return self
3468
3469 def groupStarted(self, properties):
3470 logger.debug("groupStarted: " + str(properties))
3471 p2p.Disconnect()
3472
3473 def groupFinished(self, properties):
3474 logger.debug("groupFinished: " + str(properties))
3475 self.loop.quit()
3476
3477 def persistentGroupAdded(self, path, properties):
3478 logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
3479 self.persistent = path
3480
3481 def run_test(self, *args):
3482 logger.debug("run_test")
3483 params = dbus.Dictionary({'persistent': True,
3484 'frequency': 2412})
3485 logger.info("Add a persistent group")
3486 p2p.GroupAdd(params)
3487 return False
3488
3489 def success(self):
3490 return True
3491
3492 with TestDbusP2p(bus) as t:
3493 if not t.success():
3494 raise Exception("Expected signals not seen")
3495 persistent = t.persistent
3496
3497 p_obj = bus.get_object(WPAS_DBUS_SERVICE, persistent)
3498 res = p_obj.Get(WPAS_DBUS_PERSISTENT_GROUP, "Properties",
3499 dbus_interface=dbus.PROPERTIES_IFACE, byte_arrays=True)
3500 logger.info("Persistent group Properties: " + str(res))
3501 vals = dbus.Dictionary({ 'ssid': 'DIRECT-foo' }, signature='sv')
3502 p_obj.Set(WPAS_DBUS_PERSISTENT_GROUP, "Properties", vals,
3503 dbus_interface=dbus.PROPERTIES_IFACE)
3504 res2 = p_obj.Get(WPAS_DBUS_PERSISTENT_GROUP, "Properties",
3505 dbus_interface=dbus.PROPERTIES_IFACE)
3506 if len(res) != len(res2):
3507 raise Exception("Different number of parameters")
3508 for k in res:
3509 if k != 'ssid' and res[k] != res2[k]:
3510 raise Exception("Parameter %s value changes" % k)
3511 if res2['ssid'] != '"DIRECT-foo"':
3512 raise Exception("Unexpected ssid")
3513
3514 args = dbus.Dictionary({ 'ssid': 'DIRECT-testing',
3515 'psk': '1234567890' }, signature='sv')
3516 group = p2p.AddPersistentGroup(args)
3517
3518 groups = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PersistentGroups",
3519 dbus_interface=dbus.PROPERTIES_IFACE)
3520 if len(groups) != 2:
3521 raise Exception("Unexpected number of persistent groups: " + str(groups))
3522
3523 p2p.RemoveAllPersistentGroups()
3524
3525 groups = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PersistentGroups",
3526 dbus_interface=dbus.PROPERTIES_IFACE)
3527 if len(groups) != 0:
3528 raise Exception("Unexpected number of persistent groups: " + str(groups))
3529
3530 try:
3531 p2p.RemovePersistentGroup(persistent)
3532 raise Exception("Invalid RemovePersistentGroup accepted")
3533 except dbus.exceptions.DBusException, e:
3534 if "NetworkUnknown: There is no such persistent group" not in str(e):
3535 raise Exception("Unexpected error message for invalid RemovePersistentGroup: " + str(e))
3536
3537def test_dbus_p2p_reinvoke_persistent(dev, apdev):
3538 """D-Bus P2P reinvoke persistent group"""
910eb5b5 3539 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
3540 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3541 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
3542
3543 addr0 = dev[0].p2p_dev_addr()
3544
3545 class TestDbusP2p(TestDbus):
3546 def __init__(self, bus):
3547 TestDbus.__init__(self, bus)
3548 self.first = True
3549 self.waiting_end = False
3550 self.done = False
3551 self.invited = False
3552
3553 def __enter__(self):
3554 gobject.timeout_add(1, self.run_test)
3555 gobject.timeout_add(15000, self.timeout)
3556 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3557 "DeviceFound")
3558 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3559 "GroupStarted")
3560 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3561 "GroupFinished")
3562 self.add_signal(self.persistentGroupAdded,
3563 WPAS_DBUS_IFACE_P2PDEVICE,
3564 "PersistentGroupAdded")
3565 self.add_signal(self.provisionDiscoveryRequestDisplayPin,
3566 WPAS_DBUS_IFACE_P2PDEVICE,
3567 "ProvisionDiscoveryRequestDisplayPin")
3568 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
3569 "StaAuthorized")
3570 self.loop.run()
3571 return self
3572
3573 def groupStarted(self, properties):
3574 logger.debug("groupStarted: " + str(properties))
3575 if not self.invited:
3576 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3577 dev1.scan_for_bss(addr0, freq=2412)
3578 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 join")
3579
3580 def groupFinished(self, properties):
3581 logger.debug("groupFinished: " + str(properties))
3582 if self.invited:
3583 self.done = True
3584 self.loop.quit()
3585 else:
3586 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3587 dev1.request("SET persistent_reconnect 1")
3588 dev1.p2p_listen()
3589
3590 args = { 'persistent_group_object': dbus.ObjectPath(path),
3591 'peer': self.peer_path }
3592 try:
3593 pin = p2p.Invite(args)
3594 raise Exception("Invalid Invite accepted")
3595 except dbus.exceptions.DBusException, e:
3596 if "InvalidArgs" not in str(e):
3597 raise Exception("Unexpected error message for invalid Invite: " + str(e))
3598
3599 args = { 'persistent_group_object': self.persistent,
3600 'peer': self.peer_path }
3601 pin = p2p.Invite(args)
3602 self.invited = True
3603
3604 def persistentGroupAdded(self, path, properties):
3605 logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
3606 self.persistent = path
3607
3608 def deviceFound(self, path):
3609 logger.debug("deviceFound: path=%s" % path)
3610 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3611 self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3612 dbus_interface=dbus.PROPERTIES_IFACE,
3613 byte_arrays=True)
3614
3615 def provisionDiscoveryRequestDisplayPin(self, peer_object, pin):
3616 logger.debug("provisionDiscoveryRequestDisplayPin - peer=%s pin=%s" % (peer_object, pin))
3617 self.peer_path = peer_object
3618 peer = binascii.unhexlify(peer_object.split('/')[-1])
3619 addr = ""
3620 for p in peer:
3621 if len(addr) > 0:
3622 addr += ':'
3623 addr += '%02x' % ord(p)
3624 params = { 'Role': 'registrar',
3625 'P2PDeviceAddress': self.peer['DeviceAddress'],
3626 'Bssid': self.peer['DeviceAddress'],
3627 'Type': 'pin',
3628 'Pin': '12345670' }
3629 logger.info("Authorize peer to connect to the group")
3630 wps.Start(params)
3631
3632 def staAuthorized(self, name):
3633 logger.debug("staAuthorized: " + name)
3634 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3635 dev1.remove_group()
3636 ev = dev1.wait_event(["P2P-GROUP-REMOVED"], timeout=10)
3637 if ev is None:
3638 raise Exception("Group removal timed out")
3639 p2p.Disconnect()
3640
3641 def run_test(self, *args):
3642 logger.debug("run_test")
3643 params = dbus.Dictionary({'persistent': True,
3644 'frequency': 2412})
3645 logger.info("Add a persistent group")
3646 p2p.GroupAdd(params)
3647 return False
3648
3649 def success(self):
3650 return self.done
3651
3652 with TestDbusP2p(bus) as t:
3653 if not t.success():
3654 raise Exception("Expected signals not seen")
3655
3656def test_dbus_p2p_go_neg_rx(dev, apdev):
3657 """D-Bus P2P GO Negotiation receive"""
910eb5b5 3658 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
3659 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3660 addr0 = dev[0].p2p_dev_addr()
3661
3662 class TestDbusP2p(TestDbus):
3663 def __init__(self, bus):
3664 TestDbus.__init__(self, bus)
3665 self.done = False
3666
3667 def __enter__(self):
3668 gobject.timeout_add(1, self.run_test)
3669 gobject.timeout_add(15000, self.timeout)
3670 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3671 "DeviceFound")
3672 self.add_signal(self.goNegotiationRequest,
3673 WPAS_DBUS_IFACE_P2PDEVICE,
3674 "GONegotiationRequest",
3675 byte_arrays=True)
3676 self.add_signal(self.goNegotiationSuccess,
3677 WPAS_DBUS_IFACE_P2PDEVICE,
3678 "GONegotiationSuccess",
3679 byte_arrays=True)
3680 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3681 "GroupStarted")
3682 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3683 "GroupFinished")
3684 self.loop.run()
3685 return self
3686
3687 def deviceFound(self, path):
3688 logger.debug("deviceFound: path=%s" % path)
3689
3690 def goNegotiationRequest(self, path, dev_passwd_id):
3691 logger.debug("goNegotiationRequest: path=%s dev_passwd_id=%d" % (path, dev_passwd_id))
3692 if dev_passwd_id != 1:
3693 raise Exception("Unexpected dev_passwd_id=%d" % dev_passwd_id)
3694 args = { 'peer': path, 'wps_method': 'display', 'pin': '12345670',
3695 'go_intent': 15, 'persistent': False, 'frequency': 5175 }
3696 try:
3697 p2p.Connect(args)
3698 raise Exception("Invalid Connect accepted")
3699 except dbus.exceptions.DBusException, e:
3700 if "ConnectChannelUnsupported" not in str(e):
3701 raise Exception("Unexpected error message for invalid Connect: " + str(e))
3702
3703 args = { 'peer': path, 'wps_method': 'display', 'pin': '12345670',
3704 'go_intent': 15, 'persistent': False }
3705 p2p.Connect(args)
3706
3707 def goNegotiationSuccess(self, properties):
3708 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
3709
3710 def groupStarted(self, properties):
3711 logger.debug("groupStarted: " + str(properties))
3712 p2p.Disconnect()
3713
3714 def groupFinished(self, properties):
3715 logger.debug("groupFinished: " + str(properties))
3716 self.done = True
3717 self.loop.quit()
3718
3719 def run_test(self, *args):
3720 logger.debug("run_test")
3721 p2p.Listen(10)
3722 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3723 if not dev1.discover_peer(addr0):
3724 raise Exception("Peer not found")
3725 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 enter")
3726 return False
3727
3728 def success(self):
3729 return self.done
3730
3731 with TestDbusP2p(bus) as t:
3732 if not t.success():
3733 raise Exception("Expected signals not seen")
3734
3735def test_dbus_p2p_go_neg_auth(dev, apdev):
3736 """D-Bus P2P GO Negotiation authorized"""
910eb5b5 3737 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
3738 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3739 addr0 = dev[0].p2p_dev_addr()
3740 dev[1].p2p_listen()
3741
3742 class TestDbusP2p(TestDbus):
3743 def __init__(self, bus):
3744 TestDbus.__init__(self, bus)
3745 self.done = False
3746 self.peer_joined = False
3747 self.peer_disconnected = False
3748
3749 def __enter__(self):
3750 gobject.timeout_add(1, self.run_test)
3751 gobject.timeout_add(15000, self.timeout)
3752 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3753 "DeviceFound")
3754 self.add_signal(self.goNegotiationSuccess,
3755 WPAS_DBUS_IFACE_P2PDEVICE,
3756 "GONegotiationSuccess",
3757 byte_arrays=True)
3758 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3759 "GroupStarted")
3760 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3761 "GroupFinished")
3762 self.add_signal(self.staDeauthorized, WPAS_DBUS_IFACE,
3763 "StaDeauthorized")
3764 self.add_signal(self.peerJoined, WPAS_DBUS_GROUP,
3765 "PeerJoined")
3766 self.add_signal(self.peerDisconnected, WPAS_DBUS_GROUP,
3767 "PeerDisconnected")
3768 self.loop.run()
3769 return self
3770
3771 def deviceFound(self, path):
3772 logger.debug("deviceFound: path=%s" % path)
3773 args = { 'peer': path, 'wps_method': 'keypad',
3774 'go_intent': 15, 'authorize_only': True }
3775 try:
3776 p2p.Connect(args)
3777 raise Exception("Invalid Connect accepted")
3778 except dbus.exceptions.DBusException, e:
3779 if "InvalidArgs" not in str(e):
3780 raise Exception("Unexpected error message for invalid Connect: " + str(e))
3781
3782 args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
3783 'go_intent': 15, 'authorize_only': True }
3784 p2p.Connect(args)
3785 p2p.Listen(10)
3786 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3787 if not dev1.discover_peer(addr0):
3788 raise Exception("Peer not found")
3789 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=0")
3790 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15);
3791 if ev is None:
3792 raise Exception("Group formation timed out")
3793
3794 def goNegotiationSuccess(self, properties):
3795 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
3796
3797 def groupStarted(self, properties):
3798 logger.debug("groupStarted: " + str(properties))
3799 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3800 dev1.remove_group()
3801
3802 def staDeauthorized(self, name):
3803 logger.debug("staDeuthorized: " + name)
3804 p2p.Disconnect()
3805
3806 def peerJoined(self, peer):
3807 logger.debug("peerJoined: " + peer)
3808 self.peer_joined = True
3809
3810 def peerDisconnected(self, peer):
3811 logger.debug("peerDisconnected: " + peer)
3812 self.peer_disconnected = True
3813
3814 def groupFinished(self, properties):
3815 logger.debug("groupFinished: " + str(properties))
3816 self.done = True
3817 self.loop.quit()
3818
3819 def run_test(self, *args):
3820 logger.debug("run_test")
3821 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
3822 return False
3823
3824 def success(self):
3825 return self.done and self.peer_joined and self.peer_disconnected
3826
3827 with TestDbusP2p(bus) as t:
3828 if not t.success():
3829 raise Exception("Expected signals not seen")
3830
3831def test_dbus_p2p_go_neg_init(dev, apdev):
3832 """D-Bus P2P GO Negotiation initiation"""
910eb5b5 3833 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
3834 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3835 addr0 = dev[0].p2p_dev_addr()
3836 dev[1].p2p_listen()
3837
3838 class TestDbusP2p(TestDbus):
3839 def __init__(self, bus):
3840 TestDbus.__init__(self, bus)
3841 self.done = False
3842
3843 def __enter__(self):
3844 gobject.timeout_add(1, self.run_test)
3845 gobject.timeout_add(15000, self.timeout)
3846 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3847 "DeviceFound")
3848 self.add_signal(self.goNegotiationSuccess,
3849 WPAS_DBUS_IFACE_P2PDEVICE,
3850 "GONegotiationSuccess",
3851 byte_arrays=True)
3852 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3853 "GroupStarted")
3854 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3855 "GroupFinished")
3856 self.loop.run()
3857 return self
3858
3859 def deviceFound(self, path):
3860 logger.debug("deviceFound: path=%s" % path)
3861 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3862 args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
3863 'go_intent': 0 }
3864 p2p.Connect(args)
3865
3866 ev = dev1.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15)
3867 if ev is None:
3868 raise Exception("Timeout while waiting for GO Neg Request")
3869 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15")
3870 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15);
3871 if ev is None:
3872 raise Exception("Group formation timed out")
3873
3874 def goNegotiationSuccess(self, properties):
3875 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
3876
3877 def groupStarted(self, properties):
3878 logger.debug("groupStarted: " + str(properties))
3879 p2p.Disconnect()
3880 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3881 dev1.remove_group()
3882
3883 def groupFinished(self, properties):
3884 logger.debug("groupFinished: " + str(properties))
3885 self.done = True
3886 self.loop.quit()
3887
3888 def run_test(self, *args):
3889 logger.debug("run_test")
3890 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
3891 return False
3892
3893 def success(self):
3894 return self.done
3895
3896 with TestDbusP2p(bus) as t:
3897 if not t.success():
3898 raise Exception("Expected signals not seen")
3899
3900def test_dbus_p2p_wps_failure(dev, apdev):
3901 """D-Bus P2P WPS failure"""
910eb5b5 3902 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
3903 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3904 addr0 = dev[0].p2p_dev_addr()
3905
3906 class TestDbusP2p(TestDbus):
3907 def __init__(self, bus):
3908 TestDbus.__init__(self, bus)
3909 self.done = False
3910
3911 def __enter__(self):
3912 gobject.timeout_add(1, self.run_test)
3913 gobject.timeout_add(15000, self.timeout)
3914 self.add_signal(self.goNegotiationRequest,
3915 WPAS_DBUS_IFACE_P2PDEVICE,
3916 "GONegotiationRequest",
3917 byte_arrays=True)
3918 self.add_signal(self.goNegotiationSuccess,
3919 WPAS_DBUS_IFACE_P2PDEVICE,
3920 "GONegotiationSuccess",
3921 byte_arrays=True)
3922 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3923 "GroupStarted")
3924 self.add_signal(self.wpsFailed, WPAS_DBUS_IFACE_P2PDEVICE,
3925 "WpsFailed")
3926 self.loop.run()
3927 return self
3928
3929 def goNegotiationRequest(self, path, dev_passwd_id):
3930 logger.debug("goNegotiationRequest: path=%s dev_passwd_id=%d" % (path, dev_passwd_id))
3931 if dev_passwd_id != 1:
3932 raise Exception("Unexpected dev_passwd_id=%d" % dev_passwd_id)
3933 args = { 'peer': path, 'wps_method': 'display', 'pin': '12345670',
3934 'go_intent': 15 }
3935 p2p.Connect(args)
3936
3937 def goNegotiationSuccess(self, properties):
3938 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
3939
3940 def groupStarted(self, properties):
3941 logger.debug("groupStarted: " + str(properties))
3942 raise Exception("Unexpected GroupStarted")
3943
3944 def wpsFailed(self, name, args):
3945 logger.debug("wpsFailed - name=%s args=%s" % (name, str(args)))
3946 self.done = True
3947 self.loop.quit()
3948
3949 def run_test(self, *args):
3950 logger.debug("run_test")
3951 p2p.Listen(10)
3952 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3953 if not dev1.discover_peer(addr0):
3954 raise Exception("Peer not found")
3955 dev1.global_request("P2P_CONNECT " + addr0 + " 87654321 enter")
3956 return False
3957
3958 def success(self):
3959 return self.done
3960
3961 with TestDbusP2p(bus) as t:
3962 if not t.success():
3963 raise Exception("Expected signals not seen")
3964
3965def test_dbus_p2p_two_groups(dev, apdev):
3966 """D-Bus P2P with two concurrent groups"""
910eb5b5 3967 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
3968 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3969
3970 dev[0].request("SET p2p_no_group_iface 0")
3971 addr0 = dev[0].p2p_dev_addr()
3972 addr1 = dev[1].p2p_dev_addr()
3973 addr2 = dev[2].p2p_dev_addr()
3974 dev[1].p2p_start_go(freq=2412)
3975
3976 class TestDbusP2p(TestDbus):
3977 def __init__(self, bus):
3978 TestDbus.__init__(self, bus)
3979 self.done = False
3980 self.peer = None
3981 self.go = None
3982 self.group1 = None
3983 self.group2 = None
3984
3985 def __enter__(self):
3986 gobject.timeout_add(1, self.run_test)
3987 gobject.timeout_add(15000, self.timeout)
3988 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
3989 "PropertiesChanged", byte_arrays=True)
3990 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3991 "DeviceFound")
3992 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3993 "GroupStarted")
3994 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3995 "GroupFinished")
3996 self.add_signal(self.peerJoined, WPAS_DBUS_GROUP,
3997 "PeerJoined")
3998 self.loop.run()
3999 return self
4000
4001 def propertiesChanged(self, interface_name, changed_properties,
4002 invalidated_properties):
4003 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
4004
4005 def deviceFound(self, path):
4006 logger.debug("deviceFound: path=%s" % path)
4007 if addr2.replace(':','') in path:
4008 self.peer = path
4009 elif addr1.replace(':','') in path:
4010 self.go = path
4011 if self.go and not self.group1:
4012 logger.info("Join the group")
4013 p2p.StopFind()
4014 pin = '12345670'
4015 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4016 dev1.request("WPS_PIN any " + pin)
4017 args = { 'peer': self.go,
4018 'join': True,
4019 'wps_method': 'pin',
4020 'pin': pin,
4021 'frequency': 2412 }
4022 p2p.Connect(args)
4023
4024 def groupStarted(self, properties):
4025 logger.debug("groupStarted: " + str(properties))
4026 prop = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE,
4027 dbus_interface=dbus.PROPERTIES_IFACE)
4028 logger.debug("p2pdevice properties: " + str(prop))
4029
4030 g_obj = bus.get_object(WPAS_DBUS_SERVICE,
4031 properties['group_object'])
4032 res = g_obj.GetAll(WPAS_DBUS_GROUP,
4033 dbus_interface=dbus.PROPERTIES_IFACE,
4034 byte_arrays=True)
4035 logger.debug("Group properties: " + str(res))
4036
4037 if not self.group1:
4038 self.group1 = properties['group_object']
4039 self.group1iface = properties['interface_object']
4040 self.g1_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4041 self.group1iface)
4042
4043 logger.info("Start autonomous GO")
4044 params = dbus.Dictionary({ 'frequency': 2412 })
4045 p2p.GroupAdd(params)
4046 elif not self.group2:
4047 self.group2 = properties['group_object']
4048 self.group2iface = properties['interface_object']
4049 self.g2_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4050 self.group2iface)
4051 self.g2_bssid = res['BSSID']
4052
4053 if self.group1 and self.group2:
4054 logger.info("Authorize peer to join the group")
4055 a2 = binascii.unhexlify(addr2.replace(':',''))
4056 params = { 'Role': 'enrollee',
4057 'P2PDeviceAddress': dbus.ByteArray(a2),
4058 'Bssid': dbus.ByteArray(a2),
4059 'Type': 'pin',
4060 'Pin': '12345670' }
4061 g_wps = dbus.Interface(self.g2_if_obj, WPAS_DBUS_IFACE_WPS)
4062 g_wps.Start(params)
4063
4064 bssid = ':'.join([binascii.hexlify(l) for l in self.g2_bssid])
4065 dev2 = WpaSupplicant('wlan2', '/tmp/wpas-wlan2')
4066 dev2.scan_for_bss(bssid, freq=2412)
4067 dev2.global_request("P2P_CONNECT " + bssid + " 12345670 join freq=2412")
4068
4069 def groupFinished(self, properties):
4070 logger.debug("groupFinished: " + str(properties))
4071
4072 if self.group1 == properties['group_object']:
4073 self.group1 = None
4074 elif self.group2 == properties['group_object']:
4075 self.group2 = None
4076
4077 if not self.group1 and not self.group2:
4078 self.done = True
4079 self.loop.quit()
4080
4081 def peerJoined(self, peer):
4082 logger.debug("peerJoined: " + peer)
4083 self.check_results()
4084
4085 dev2 = WpaSupplicant('wlan2', '/tmp/wpas-wlan2')
4086 dev2.remove_group()
4087
4088 logger.info("Disconnect group2")
4089 group_p2p = dbus.Interface(self.g2_if_obj,
4090 WPAS_DBUS_IFACE_P2PDEVICE)
4091 group_p2p.Disconnect()
4092
4093 logger.info("Disconnect group1")
4094 group_p2p = dbus.Interface(self.g1_if_obj,
4095 WPAS_DBUS_IFACE_P2PDEVICE)
4096 group_p2p.Disconnect()
4097
4098 def check_results(self):
4099 logger.info("Check results with two concurrent groups in operation")
4100
4101 g1_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group1)
4102 res1 = g1_obj.GetAll(WPAS_DBUS_GROUP,
4103 dbus_interface=dbus.PROPERTIES_IFACE,
4104 byte_arrays=True)
4105
4106 g2_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group2)
4107 res2 = g2_obj.GetAll(WPAS_DBUS_GROUP,
4108 dbus_interface=dbus.PROPERTIES_IFACE,
4109 byte_arrays=True)
4110
4111 logger.info("group1 = " + self.group1)
4112 logger.debug("Group properties: " + str(res1))
4113
4114 logger.info("group2 = " + self.group2)
4115 logger.debug("Group properties: " + str(res2))
4116
4117 prop = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE,
4118 dbus_interface=dbus.PROPERTIES_IFACE)
4119 logger.debug("p2pdevice properties: " + str(prop))
4120
4121 if res1['Role'] != 'client':
4122 raise Exception("Group1 role reported incorrectly: " + res1['Role'])
4123 if res2['Role'] != 'GO':
4124 raise Exception("Group2 role reported incorrectly: " + res2['Role'])
4125 if prop['Role'] != 'device':
4126 raise Exception("p2pdevice role reported incorrectly: " + prop['Role'])
4127
4128 if len(res2['Members']) != 1:
4129 raise Exception("Unexpected Members value for group 2")
4130
4131 def run_test(self, *args):
4132 logger.debug("run_test")
4133 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4134 return False
4135
4136 def success(self):
4137 return self.done
4138
4139 with TestDbusP2p(bus) as t:
4140 if not t.success():
4141 raise Exception("Expected signals not seen")
4142
4143 dev[1].remove_group()
0e126c6d
JM
4144
4145def test_dbus_introspect(dev, apdev):
4146 """D-Bus introspection"""
4147 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4148
4149 res = if_obj.Introspect(WPAS_DBUS_IFACE,
4150 dbus_interface=dbus.INTROSPECTABLE_IFACE)
4151 logger.info("Initial Introspect: " + str(res))
4152 if res is None or "Introspectable" not in res or "GroupStarted" not in res:
4153 raise Exception("Unexpected initial Introspect response: " + str(res))
4154
4155 with alloc_fail(dev[0], 1, "wpa_dbus_introspect"):
4156 res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
4157 dbus_interface=dbus.INTROSPECTABLE_IFACE)
4158 logger.info("Introspect: " + str(res2))
4159 if res2 is not None:
4160 raise Exception("Unexpected Introspect response")
4161
4162 with alloc_fail(dev[0], 1, "=add_interface;wpa_dbus_introspect"):
4163 res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
4164 dbus_interface=dbus.INTROSPECTABLE_IFACE)
4165 logger.info("Introspect: " + str(res2))
4166 if res2 is None:
4167 raise Exception("No Introspect response")
4168 if len(res2) >= len(res):
4169 raise Exception("Unexpected Introspect response")
4170
4171 with alloc_fail(dev[0], 1, "wpabuf_alloc;add_interface;wpa_dbus_introspect"):
4172 res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
4173 dbus_interface=dbus.INTROSPECTABLE_IFACE)
4174 logger.info("Introspect: " + str(res2))
4175 if res2 is None:
4176 raise Exception("No Introspect response")
4177 if len(res2) >= len(res):
4178 raise Exception("Unexpected Introspect response")
4179
4180 with alloc_fail(dev[0], 2, "=add_interface;wpa_dbus_introspect"):
4181 res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
4182 dbus_interface=dbus.INTROSPECTABLE_IFACE)
4183 logger.info("Introspect: " + str(res2))
4184 if res2 is None:
4185 raise Exception("No Introspect response")
4186 if len(res2) >= len(res):
4187 raise Exception("Unexpected Introspect response")