]> git.ipfire.org Git - thirdparty/hostap.git/blame - tests/hwsim/test_dbus.py
tests: EAP-FAST local error cases
[thirdparty/hostap.git] / tests / hwsim / test_dbus.py
CommitLineData
2ec82e67 1# wpa_supplicant D-Bus interface tests
910eb5b5 2# Copyright (c) 2014-2015, Jouni Malinen <j@w1.fi>
2ec82e67
JM
3#
4# This software may be distributed under the terms of the BSD license.
5# See README for more details.
6
7import binascii
2ec82e67
JM
8import logging
9logger = logging.getLogger()
10import subprocess
11import time
12
910eb5b5 13try:
b4de353c 14 import gobject
910eb5b5
JM
15 import dbus
16 dbus_imported = True
17except ImportError:
18 dbus_imported = False
19
2ec82e67
JM
20import hostapd
21from wpasupplicant import WpaSupplicant
708ec753 22from utils import HwsimSkip, alloc_fail, fail_test
1992af11 23from p2p_utils import *
2ec82e67 24from test_ap_tdls import connect_2sta_open
089e7ca3 25from test_ap_eap import check_altsubject_match_support
2ec82e67
JM
26
27WPAS_DBUS_SERVICE = "fi.w1.wpa_supplicant1"
28WPAS_DBUS_PATH = "/fi/w1/wpa_supplicant1"
29WPAS_DBUS_IFACE = "fi.w1.wpa_supplicant1.Interface"
30WPAS_DBUS_IFACE_WPS = WPAS_DBUS_IFACE + ".WPS"
31WPAS_DBUS_NETWORK = "fi.w1.wpa_supplicant1.Network"
32WPAS_DBUS_BSS = "fi.w1.wpa_supplicant1.BSS"
33WPAS_DBUS_IFACE_P2PDEVICE = WPAS_DBUS_IFACE + ".P2PDevice"
34WPAS_DBUS_P2P_PEER = "fi.w1.wpa_supplicant1.Peer"
35WPAS_DBUS_GROUP = "fi.w1.wpa_supplicant1.Group"
36WPAS_DBUS_PERSISTENT_GROUP = "fi.w1.wpa_supplicant1.PersistentGroup"
37
38def prepare_dbus(dev):
910eb5b5
JM
39 if not dbus_imported:
40 logger.info("No dbus module available")
51c5aeb4 41 raise HwsimSkip("No dbus module available")
2ec82e67 42 try:
2ec82e67
JM
43 from dbus.mainloop.glib import DBusGMainLoop
44 dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
45 bus = dbus.SystemBus()
46 wpas_obj = bus.get_object(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH)
47 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
48 path = wpas.GetInterface(dev.ifname)
49 if_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
910eb5b5 50 return (bus,wpas_obj,path,if_obj)
2ec82e67 51 except Exception, e:
51c5aeb4 52 raise HwsimSkip("Could not connect to D-Bus: %s" % e)
2ec82e67
JM
53
54class TestDbus(object):
55 def __init__(self, bus):
56 self.loop = gobject.MainLoop()
57 self.signals = []
58 self.bus = bus
59
60 def __exit__(self, type, value, traceback):
61 for s in self.signals:
62 s.remove()
63
64 def add_signal(self, handler, interface, name, byte_arrays=False):
65 s = self.bus.add_signal_receiver(handler, dbus_interface=interface,
66 signal_name=name,
67 byte_arrays=byte_arrays)
68 self.signals.append(s)
69
70 def timeout(self, *args):
71 logger.debug("timeout")
72 self.loop.quit()
73 return False
74
0e126c6d
JM
75class alloc_fail_dbus(object):
76 def __init__(self, dev, count, funcs, operation="Operation",
77 expected="NoMemory"):
78 self._dev = dev
79 self._count = count
80 self._funcs = funcs
81 self._operation = operation
82 self._expected = expected
83 def __enter__(self):
84 cmd = "TEST_ALLOC_FAIL %d:%s" % (self._count, self._funcs)
85 if "OK" not in self._dev.request(cmd):
86 raise HwsimSkip("TEST_ALLOC_FAIL not supported")
87 def __exit__(self, type, value, traceback):
88 if type is None:
89 raise Exception("%s succeeded during out-of-memory" % self._operation)
90 if type == dbus.exceptions.DBusException and self._expected in str(value):
91 return True
92 if self._dev.request("GET_ALLOC_FAIL") != "0:%s" % self._funcs:
93 raise Exception("%s did not trigger allocation failure" % self._operation)
94 return False
95
3a59cda1
JM
96def start_ap(ap, ssid="test-wps",
97 ap_uuid="27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"):
2ec82e67
JM
98 params = { "ssid": ssid, "eap_server": "1", "wps_state": "2",
99 "wpa_passphrase": "12345678", "wpa": "2",
100 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
3a59cda1 101 "ap_pin": "12345670", "uuid": ap_uuid}
2ec82e67
JM
102 return hostapd.add_ap(ap['ifname'], params)
103
104def test_dbus_getall(dev, apdev):
105 """D-Bus GetAll"""
910eb5b5 106 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
107
108 props = wpas_obj.GetAll(WPAS_DBUS_SERVICE,
109 dbus_interface=dbus.PROPERTIES_IFACE)
110 logger.debug("GetAll(fi.w1.wpa.supplicant1, /fi/w1/wpa_supplicant1) ==> " + str(props))
111
112 props = if_obj.GetAll(WPAS_DBUS_IFACE,
113 dbus_interface=dbus.PROPERTIES_IFACE)
114 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_IFACE, path, str(props)))
115
116 props = if_obj.GetAll(WPAS_DBUS_IFACE_WPS,
117 dbus_interface=dbus.PROPERTIES_IFACE)
118 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_IFACE_WPS, path, str(props)))
119
120 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
121 dbus_interface=dbus.PROPERTIES_IFACE)
122 if len(res) != 0:
123 raise Exception("Unexpected BSSs entry: " + str(res))
124
125 res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
126 dbus_interface=dbus.PROPERTIES_IFACE)
127 if len(res) != 0:
128 raise Exception("Unexpected Networks entry: " + str(res))
129
130 hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "open" })
131 bssid = apdev[0]['bssid']
132 dev[0].scan_for_bss(bssid, freq=2412)
133 id = dev[0].add_network()
134 dev[0].set_network(id, "disabled", "0")
135 dev[0].set_network_quoted(id, "ssid", "test")
136
137 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
138 dbus_interface=dbus.PROPERTIES_IFACE)
139 if len(res) != 1:
140 raise Exception("Missing BSSs entry: " + str(res))
141 bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
142 props = bss_obj.GetAll(WPAS_DBUS_BSS, dbus_interface=dbus.PROPERTIES_IFACE)
143 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_BSS, res[0], str(props)))
144 bssid_str = ''
145 for item in props['BSSID']:
146 if len(bssid_str) > 0:
147 bssid_str += ':'
148 bssid_str += '%02x' % item
149 if bssid_str != bssid:
150 raise Exception("Unexpected BSSID in BSSs entry")
151
152 res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
153 dbus_interface=dbus.PROPERTIES_IFACE)
154 if len(res) != 1:
155 raise Exception("Missing Networks entry: " + str(res))
156 net_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
157 props = net_obj.GetAll(WPAS_DBUS_NETWORK,
158 dbus_interface=dbus.PROPERTIES_IFACE)
159 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_NETWORK, res[0], str(props)))
160 ssid = props['Properties']['ssid']
161 if ssid != '"test"':
162 raise Exception("Unexpected SSID in network entry")
163
164def dbus_get(dbus, wpas_obj, prop, expect=None, byte_arrays=False):
165 val = wpas_obj.Get(WPAS_DBUS_SERVICE, prop,
166 dbus_interface=dbus.PROPERTIES_IFACE,
167 byte_arrays=byte_arrays)
168 if expect is not None and val != expect:
169 raise Exception("Unexpected %s: %s (expected: %s)" %
170 (prop, str(val), str(expect)))
171 return val
172
173def dbus_set(dbus, wpas_obj, prop, val):
174 wpas_obj.Set(WPAS_DBUS_SERVICE, prop, val,
175 dbus_interface=dbus.PROPERTIES_IFACE)
176
177def test_dbus_properties(dev, apdev):
178 """D-Bus Get/Set fi.w1.wpa_supplicant1 properties"""
910eb5b5 179 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
180
181 dbus_get(dbus, wpas_obj, "DebugLevel", expect="msgdump")
182 dbus_set(dbus, wpas_obj, "DebugLevel", "debug")
183 dbus_get(dbus, wpas_obj, "DebugLevel", expect="debug")
184 for (val,err) in [ (3, "Error.Failed: wrong property type"),
185 ("foo", "Error.Failed: wrong debug level value") ]:
186 try:
187 dbus_set(dbus, wpas_obj, "DebugLevel", val)
188 raise Exception("Invalid DebugLevel value accepted: " + str(val))
189 except dbus.exceptions.DBusException, e:
190 if err not in str(e):
191 raise Exception("Unexpected error message: " + str(e))
192 dbus_set(dbus, wpas_obj, "DebugLevel", "msgdump")
193 dbus_get(dbus, wpas_obj, "DebugLevel", expect="msgdump")
194
195 dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=True)
196 dbus_set(dbus, wpas_obj, "DebugTimestamp", False)
197 dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=False)
198 try:
199 dbus_set(dbus, wpas_obj, "DebugTimestamp", "foo")
200 raise Exception("Invalid DebugTimestamp value accepted")
201 except dbus.exceptions.DBusException, e:
202 if "Error.Failed: wrong property type" not in str(e):
203 raise Exception("Unexpected error message: " + str(e))
204 dbus_set(dbus, wpas_obj, "DebugTimestamp", True)
205 dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=True)
206
207 dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=True)
208 dbus_set(dbus, wpas_obj, "DebugShowKeys", False)
209 dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=False)
210 try:
211 dbus_set(dbus, wpas_obj, "DebugShowKeys", "foo")
212 raise Exception("Invalid DebugShowKeys value accepted")
213 except dbus.exceptions.DBusException, e:
214 if "Error.Failed: wrong property type" not in str(e):
215 raise Exception("Unexpected error message: " + str(e))
216 dbus_set(dbus, wpas_obj, "DebugShowKeys", True)
217 dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=True)
218
219 res = dbus_get(dbus, wpas_obj, "Interfaces")
220 if len(res) != 1:
221 raise Exception("Unexpected Interfaces value: " + str(res))
222
223 res = dbus_get(dbus, wpas_obj, "EapMethods")
224 if len(res) < 5 or "TTLS" not in res:
225 raise Exception("Unexpected EapMethods value: " + str(res))
226
227 res = dbus_get(dbus, wpas_obj, "Capabilities")
228 if len(res) < 2 or "p2p" not in res:
229 raise Exception("Unexpected Capabilities value: " + str(res))
230
231 dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True)
232 val = binascii.unhexlify("010006020304050608")
233 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(val))
234 res = dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True)
235 if val != res:
236 raise Exception("WFDIEs value changed")
237 try:
238 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray('\x00'))
239 raise Exception("Invalid WFDIEs value accepted")
240 except dbus.exceptions.DBusException, e:
241 if "InvalidArgs" not in str(e):
242 raise Exception("Unexpected error message: " + str(e))
243 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(''))
244 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(val))
245 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(''))
246 res = dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True)
247 if len(res) != 0:
248 raise Exception("WFDIEs not cleared properly")
249
250 res = dbus_get(dbus, wpas_obj, "EapMethods")
251 try:
252 dbus_set(dbus, wpas_obj, "EapMethods", res)
253 raise Exception("Invalid Set accepted")
254 except dbus.exceptions.DBusException, e:
255 if "InvalidArgs: Property is read-only" not in str(e):
256 raise Exception("Unexpected error message: " + str(e))
257
258 try:
259 wpas_obj.SetFoo(WPAS_DBUS_SERVICE, "DebugShowKeys", True,
260 dbus_interface=dbus.PROPERTIES_IFACE)
261 raise Exception("Unknown method accepted")
262 except dbus.exceptions.DBusException, e:
263 if "UnknownMethod" not in str(e):
264 raise Exception("Unexpected error message: " + str(e))
265
266 try:
267 wpas_obj.Get("foo", "DebugShowKeys",
268 dbus_interface=dbus.PROPERTIES_IFACE)
269 raise Exception("Invalid Get accepted")
270 except dbus.exceptions.DBusException, e:
271 if "InvalidArgs: No such property" not in str(e):
272 raise Exception("Unexpected error message: " + str(e))
273
274 test_obj = bus.get_object(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH,
275 introspect=False)
276 try:
277 test_obj.Get(123, "DebugShowKeys",
278 dbus_interface=dbus.PROPERTIES_IFACE)
279 raise Exception("Invalid Get accepted")
280 except dbus.exceptions.DBusException, e:
281 if "InvalidArgs: Invalid arguments" not in str(e):
282 raise Exception("Unexpected error message: " + str(e))
283 try:
284 test_obj.Get(WPAS_DBUS_SERVICE, 123,
285 dbus_interface=dbus.PROPERTIES_IFACE)
286 raise Exception("Invalid Get accepted")
287 except dbus.exceptions.DBusException, e:
288 if "InvalidArgs: Invalid arguments" not in str(e):
289 raise Exception("Unexpected error message: " + str(e))
290
291 try:
292 wpas_obj.Set(WPAS_DBUS_SERVICE, "WFDIEs",
293 dbus.ByteArray('', variant_level=2),
294 dbus_interface=dbus.PROPERTIES_IFACE)
295 raise Exception("Invalid Set accepted")
296 except dbus.exceptions.DBusException, e:
297 if "InvalidArgs: invalid message format" not in str(e):
298 raise Exception("Unexpected error message: " + str(e))
299
f0ef6889
DW
300def test_dbus_set_global_properties(dev, apdev):
301 """D-Bus Get/Set fi.w1.wpa_supplicant1 interface global properties"""
302 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
303
304 props = [ ('Okc', '0', '1'), ('ModelName', '', 'blahblahblah') ]
305
306 for p in props:
307 res = if_obj.Get(WPAS_DBUS_IFACE, p[0],
308 dbus_interface=dbus.PROPERTIES_IFACE)
309 if res != p[1]:
310 raise Exception("Unexpected " + p[0] + " value: " + str(res))
311
312 if_obj.Set(WPAS_DBUS_IFACE, p[0], p[2],
313 dbus_interface=dbus.PROPERTIES_IFACE)
314
315 res = if_obj.Get(WPAS_DBUS_IFACE, p[0],
316 dbus_interface=dbus.PROPERTIES_IFACE)
317 if res != p[2]:
318 raise Exception("Unexpected " + p[0] + " value after set: " + str(res))
319
2ec82e67
JM
320def test_dbus_invalid_method(dev, apdev):
321 """D-Bus invalid method"""
910eb5b5 322 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
323 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
324
325 try:
326 wps.Foo()
327 raise Exception("Unknown method accepted")
328 except dbus.exceptions.DBusException, e:
329 if "UnknownMethod" not in str(e):
330 raise Exception("Unexpected error message: " + str(e))
331
332 test_obj = bus.get_object(WPAS_DBUS_SERVICE, path, introspect=False)
333 test_wps = dbus.Interface(test_obj, WPAS_DBUS_IFACE_WPS)
334 try:
335 test_wps.Start(123)
336 raise Exception("WPS.Start with incorrect signature accepted")
337 except dbus.exceptions.DBusException, e:
338 if "InvalidArgs: Invalid arg" not in str(e):
339 raise Exception("Unexpected error message: " + str(e))
340
341def test_dbus_get_set_wps(dev, apdev):
342 """D-Bus Get/Set for WPS properties"""
343 try:
81e787b7 344 _test_dbus_get_set_wps(dev, apdev)
2ec82e67
JM
345 finally:
346 dev[0].request("SET wps_cred_processing 0")
364e28c9 347 dev[0].request("SET config_methods display keypad virtual_display nfc_interface p2ps")
2ec82e67
JM
348
349def _test_dbus_get_set_wps(dev, apdev):
910eb5b5 350 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
351
352 if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods",
353 dbus_interface=dbus.PROPERTIES_IFACE)
354
355 val = "display keypad virtual_display nfc_interface"
356 dev[0].request("SET config_methods " + val)
357
358 config = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods",
359 dbus_interface=dbus.PROPERTIES_IFACE)
360 if config != val:
361 raise Exception("Unexpected Get(ConfigMethods) result: " + config)
362
363 val2 = "push_button display"
364 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ConfigMethods", val2,
365 dbus_interface=dbus.PROPERTIES_IFACE)
366 config = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods",
367 dbus_interface=dbus.PROPERTIES_IFACE)
368 if config != val2:
369 raise Exception("Unexpected Get(ConfigMethods) result after Set: " + config)
370
371 dev[0].request("SET config_methods " + val)
372
373 for i in range(3):
374 dev[0].request("SET wps_cred_processing " + str(i))
375 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
376 dbus_interface=dbus.PROPERTIES_IFACE)
377 expected_val = False if i == 1 else True
378 if val != expected_val:
379 raise Exception("Unexpected Get(ProcessCredentials) result({}): {}".format(i, val))
380
381 class TestDbusGetSet(TestDbus):
382 def __init__(self, bus):
383 TestDbus.__init__(self, bus)
384 self.signal_received = False
385 self.signal_received_deprecated = False
386 self.sets_done = False
387
388 def __enter__(self):
389 gobject.timeout_add(1, self.run_sets)
390 gobject.timeout_add(1000, self.timeout)
391 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE_WPS,
392 "PropertiesChanged")
393 self.add_signal(self.propertiesChanged2, dbus.PROPERTIES_IFACE,
394 "PropertiesChanged")
395 self.loop.run()
396 return self
397
398 def propertiesChanged(self, properties):
399 logger.debug("PropertiesChanged: " + str(properties))
400 if properties.has_key("ProcessCredentials"):
401 self.signal_received_deprecated = True
402 if self.sets_done and self.signal_received:
403 self.loop.quit()
404
405 def propertiesChanged2(self, interface_name, changed_properties,
406 invalidated_properties):
407 logger.debug("propertiesChanged2: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
408 if interface_name != WPAS_DBUS_IFACE_WPS:
409 return
410 if changed_properties.has_key("ProcessCredentials"):
411 self.signal_received = True
412 if self.sets_done and self.signal_received_deprecated:
413 self.loop.quit()
414
415 def run_sets(self, *args):
416 logger.debug("run_sets")
417 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
418 dbus.Boolean(1),
419 dbus_interface=dbus.PROPERTIES_IFACE)
420 if if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
421 dbus_interface=dbus.PROPERTIES_IFACE) != True:
422 raise Exception("Unexpected Get(ProcessCredentials) result after Set");
423 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
424 dbus.Boolean(0),
425 dbus_interface=dbus.PROPERTIES_IFACE)
426 if if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
427 dbus_interface=dbus.PROPERTIES_IFACE) != False:
428 raise Exception("Unexpected Get(ProcessCredentials) result after Set");
429
430 self.dbus_sets_done = True
431 return False
432
433 def success(self):
434 return self.signal_received and self.signal_received_deprecated
435
436 with TestDbusGetSet(bus) as t:
437 if not t.success():
438 raise Exception("No signal received for ProcessCredentials change")
439
440def test_dbus_wps_invalid(dev, apdev):
441 """D-Bus invaldi WPS operation"""
910eb5b5 442 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
443 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
444
445 failures = [ {'Role': 'foo', 'Type': 'pbc'},
446 {'Role': 123, 'Type': 'pbc'},
447 {'Type': 'pbc'},
448 {'Role': 'enrollee'},
449 {'Role': 'registrar'},
450 {'Role': 'enrollee', 'Type': 123},
451 {'Role': 'enrollee', 'Type': 'foo'},
452 {'Role': 'enrollee', 'Type': 'pbc',
453 'Bssid': '02:33:44:55:66:77'},
454 {'Role': 'enrollee', 'Type': 'pin', 'Pin': 123},
455 {'Role': 'enrollee', 'Type': 'pbc',
456 'Bssid': dbus.ByteArray('12345')},
457 {'Role': 'enrollee', 'Type': 'pbc',
458 'P2PDeviceAddress': 12345},
459 {'Role': 'enrollee', 'Type': 'pbc',
460 'P2PDeviceAddress': dbus.ByteArray('12345')},
461 {'Role': 'enrollee', 'Type': 'pbc', 'Foo': 'bar'} ]
462 for args in failures:
463 try:
464 wps.Start(args)
465 raise Exception("Invalid WPS.Start() arguments accepted: " + str(args))
466 except dbus.exceptions.DBusException, e:
467 if not str(e).startswith("fi.w1.wpa_supplicant1.InvalidArgs"):
468 raise Exception("Unexpected error message: " + str(e))
469
0e126c6d
JM
470def test_dbus_wps_oom(dev, apdev):
471 """D-Bus WPS operation (OOM)"""
472 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
473 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
474
475 with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_getter_state", "Get"):
476 if_obj.Get(WPAS_DBUS_IFACE, "State",
477 dbus_interface=dbus.PROPERTIES_IFACE)
478
479 hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "open" })
480 bssid = apdev[0]['bssid']
481 dev[0].scan_for_bss(bssid, freq=2412)
482
1d32bc2c 483 time.sleep(0.05)
0e126c6d
JM
484 for i in range(1, 3):
485 with alloc_fail_dbus(dev[0], i, "=wpas_dbus_getter_bsss", "Get"):
486 if_obj.Get(WPAS_DBUS_IFACE, "BSSs",
487 dbus_interface=dbus.PROPERTIES_IFACE)
488
489 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
490 dbus_interface=dbus.PROPERTIES_IFACE)
491 bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
492 with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_getter_bss_rates", "Get"):
493 bss_obj.Get(WPAS_DBUS_BSS, "Rates",
494 dbus_interface=dbus.PROPERTIES_IFACE)
495
496 id = dev[0].add_network()
497 dev[0].set_network(id, "disabled", "0")
498 dev[0].set_network_quoted(id, "ssid", "test")
499
500 for i in range(1, 3):
501 with alloc_fail_dbus(dev[0], i, "=wpas_dbus_getter_networks", "Get"):
502 if_obj.Get(WPAS_DBUS_IFACE, "Networks",
503 dbus_interface=dbus.PROPERTIES_IFACE)
504
505 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_getter_interfaces", "Get"):
506 dbus_get(dbus, wpas_obj, "Interfaces")
507
508 for i in range(1, 6):
509 with alloc_fail_dbus(dev[0], i, "=eap_get_names_as_string_array;wpas_dbus_getter_eap_methods", "Get"):
510 dbus_get(dbus, wpas_obj, "EapMethods")
511
512 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_setter_config_methods", "Set",
513 expected="Error.Failed: Failed to set property"):
514 val2 = "push_button display"
515 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ConfigMethods", val2,
516 dbus_interface=dbus.PROPERTIES_IFACE)
517
518 with alloc_fail_dbus(dev[0], 1, "=wpa_config_add_network;wpas_dbus_handler_wps_start",
519 "WPS.Start",
520 expected="UnknownError: WPS start failed"):
521 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670'})
522
2ec82e67
JM
523def test_dbus_wps_pbc(dev, apdev):
524 """D-Bus WPS/PBC operation and signals"""
525 try:
81e787b7 526 _test_dbus_wps_pbc(dev, apdev)
2ec82e67
JM
527 finally:
528 dev[0].request("SET wps_cred_processing 0")
529
530def _test_dbus_wps_pbc(dev, apdev):
910eb5b5 531 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
532 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
533
534 hapd = start_ap(apdev[0])
535 hapd.request("WPS_PBC")
536 bssid = apdev[0]['bssid']
537 dev[0].scan_for_bss(bssid, freq="2412")
538 dev[0].request("SET wps_cred_processing 2")
539
1d0a917f
JM
540 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
541 dbus_interface=dbus.PROPERTIES_IFACE)
542 if len(res) != 1:
543 raise Exception("Missing BSSs entry: " + str(res))
544 bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
545 props = bss_obj.GetAll(WPAS_DBUS_BSS, dbus_interface=dbus.PROPERTIES_IFACE)
546 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_BSS, res[0], str(props)))
547 if 'WPS' not in props:
548 raise Exception("No WPS information in the BSS entry")
549 if 'Type' not in props['WPS']:
550 raise Exception("No Type field in the WPS dictionary")
551 if props['WPS']['Type'] != 'pbc':
552 raise Exception("Unexpected WPS Type: " + props['WPS']['Type'])
553
2ec82e67
JM
554 class TestDbusWps(TestDbus):
555 def __init__(self, bus, wps):
556 TestDbus.__init__(self, bus)
557 self.success_seen = False
558 self.credentials_received = False
559 self.wps = wps
560
561 def __enter__(self):
562 gobject.timeout_add(1, self.start_pbc)
563 gobject.timeout_add(15000, self.timeout)
564 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
565 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
566 "Credentials")
567 self.loop.run()
568 return self
569
570 def wpsEvent(self, name, args):
571 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
572 if name == "success":
573 self.success_seen = True
574 if self.credentials_received:
575 self.loop.quit()
576
577 def credentials(self, args):
578 logger.debug("credentials: " + str(args))
579 self.credentials_received = True
580 if self.success_seen:
581 self.loop.quit()
582
583 def start_pbc(self, *args):
584 logger.debug("start_pbc")
585 self.wps.Start({'Role': 'enrollee', 'Type': 'pbc'})
586 return False
587
588 def success(self):
589 return self.success_seen and self.credentials_received
590
591 with TestDbusWps(bus, wps) as t:
592 if not t.success():
593 raise Exception("Failure in D-Bus operations")
594
595 dev[0].wait_connected(timeout=10)
596 dev[0].request("DISCONNECT")
597 hapd.disable()
598 dev[0].flush_scan_cache()
599
3a59cda1
JM
600def test_dbus_wps_pbc_overlap(dev, apdev):
601 """D-Bus WPS/PBC operation and signal for PBC overlap"""
602 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
603 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
604
605 hapd = start_ap(apdev[0])
606 hapd2 = start_ap(apdev[1], ssid="test-wps2",
607 ap_uuid="27ea801a-9e5c-4e73-bd82-f89cbcd10d7f")
608 hapd.request("WPS_PBC")
609 hapd2.request("WPS_PBC")
610 bssid = apdev[0]['bssid']
611 dev[0].scan_for_bss(bssid, freq="2412")
612 bssid2 = apdev[1]['bssid']
613 dev[0].scan_for_bss(bssid2, freq="2412")
614
615 class TestDbusWps(TestDbus):
616 def __init__(self, bus, wps):
617 TestDbus.__init__(self, bus)
618 self.overlap_seen = False
619 self.wps = wps
620
621 def __enter__(self):
622 gobject.timeout_add(1, self.start_pbc)
623 gobject.timeout_add(15000, self.timeout)
624 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
625 self.loop.run()
626 return self
627
628 def wpsEvent(self, name, args):
629 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
630 if name == "pbc-overlap":
631 self.overlap_seen = True
632 self.loop.quit()
633
634 def start_pbc(self, *args):
635 logger.debug("start_pbc")
636 self.wps.Start({'Role': 'enrollee', 'Type': 'pbc'})
637 return False
638
639 def success(self):
640 return self.overlap_seen
641
642 with TestDbusWps(bus, wps) as t:
643 if not t.success():
644 raise Exception("Failure in D-Bus operations")
645
646 dev[0].request("WPS_CANCEL")
647 dev[0].request("DISCONNECT")
648 hapd.disable()
649 dev[0].flush_scan_cache()
650
2ec82e67
JM
651def test_dbus_wps_pin(dev, apdev):
652 """D-Bus WPS/PIN operation and signals"""
653 try:
81e787b7 654 _test_dbus_wps_pin(dev, apdev)
2ec82e67
JM
655 finally:
656 dev[0].request("SET wps_cred_processing 0")
657
658def _test_dbus_wps_pin(dev, apdev):
910eb5b5 659 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
660 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
661
662 hapd = start_ap(apdev[0])
663 hapd.request("WPS_PIN any 12345670")
664 bssid = apdev[0]['bssid']
665 dev[0].scan_for_bss(bssid, freq="2412")
666 dev[0].request("SET wps_cred_processing 2")
667
668 class TestDbusWps(TestDbus):
669 def __init__(self, bus):
670 TestDbus.__init__(self, bus)
671 self.success_seen = False
672 self.credentials_received = False
673
674 def __enter__(self):
675 gobject.timeout_add(1, self.start_pin)
676 gobject.timeout_add(15000, self.timeout)
677 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
678 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
679 "Credentials")
680 self.loop.run()
681 return self
682
683 def wpsEvent(self, name, args):
684 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
685 if name == "success":
686 self.success_seen = True
687 if self.credentials_received:
688 self.loop.quit()
689
690 def credentials(self, args):
691 logger.debug("credentials: " + str(args))
692 self.credentials_received = True
693 if self.success_seen:
694 self.loop.quit()
695
696 def start_pin(self, *args):
697 logger.debug("start_pin")
698 bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
699 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
700 'Bssid': bssid_ay})
701 return False
702
703 def success(self):
704 return self.success_seen and self.credentials_received
705
706 with TestDbusWps(bus) as t:
707 if not t.success():
708 raise Exception("Failure in D-Bus operations")
709
710 dev[0].wait_connected(timeout=10)
711
712def test_dbus_wps_pin2(dev, apdev):
713 """D-Bus WPS/PIN operation and signals (PIN from wpa_supplicant)"""
714 try:
81e787b7 715 _test_dbus_wps_pin2(dev, apdev)
2ec82e67
JM
716 finally:
717 dev[0].request("SET wps_cred_processing 0")
718
719def _test_dbus_wps_pin2(dev, apdev):
910eb5b5 720 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
721 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
722
723 hapd = start_ap(apdev[0])
724 bssid = apdev[0]['bssid']
725 dev[0].scan_for_bss(bssid, freq="2412")
726 dev[0].request("SET wps_cred_processing 2")
727
728 class TestDbusWps(TestDbus):
729 def __init__(self, bus):
730 TestDbus.__init__(self, bus)
731 self.success_seen = False
732 self.failed = False
733
734 def __enter__(self):
735 gobject.timeout_add(1, self.start_pin)
736 gobject.timeout_add(15000, self.timeout)
737 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
738 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
739 "Credentials")
740 self.loop.run()
741 return self
742
743 def wpsEvent(self, name, args):
744 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
745 if name == "success":
746 self.success_seen = True
747 if self.credentials_received:
748 self.loop.quit()
749
750 def credentials(self, args):
751 logger.debug("credentials: " + str(args))
752 self.credentials_received = True
753 if self.success_seen:
754 self.loop.quit()
755
756 def start_pin(self, *args):
757 logger.debug("start_pin")
758 bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
759 res = wps.Start({'Role': 'enrollee', 'Type': 'pin',
760 'Bssid': bssid_ay})
761 pin = res['Pin']
762 h = hostapd.Hostapd(apdev[0]['ifname'])
763 h.request("WPS_PIN any " + pin)
764 return False
765
766 def success(self):
767 return self.success_seen and self.credentials_received
768
769 with TestDbusWps(bus) as t:
770 if not t.success():
771 raise Exception("Failure in D-Bus operations")
772
773 dev[0].wait_connected(timeout=10)
774
775def test_dbus_wps_pin_m2d(dev, apdev):
776 """D-Bus WPS/PIN operation and signals with M2D"""
777 try:
81e787b7 778 _test_dbus_wps_pin_m2d(dev, apdev)
2ec82e67
JM
779 finally:
780 dev[0].request("SET wps_cred_processing 0")
781
782def _test_dbus_wps_pin_m2d(dev, apdev):
910eb5b5 783 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
784 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
785
786 hapd = start_ap(apdev[0])
787 bssid = apdev[0]['bssid']
788 dev[0].scan_for_bss(bssid, freq="2412")
789 dev[0].request("SET wps_cred_processing 2")
790
791 class TestDbusWps(TestDbus):
792 def __init__(self, bus):
793 TestDbus.__init__(self, bus)
794 self.success_seen = False
795 self.credentials_received = False
796
797 def __enter__(self):
798 gobject.timeout_add(1, self.start_pin)
799 gobject.timeout_add(15000, self.timeout)
800 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
801 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
802 "Credentials")
803 self.loop.run()
804 return self
805
806 def wpsEvent(self, name, args):
807 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
808 if name == "success":
809 self.success_seen = True
810 if self.credentials_received:
811 self.loop.quit()
812 elif name == "m2d":
813 h = hostapd.Hostapd(apdev[0]['ifname'])
814 h.request("WPS_PIN any 12345670")
815
816 def credentials(self, args):
817 logger.debug("credentials: " + str(args))
818 self.credentials_received = True
819 if self.success_seen:
820 self.loop.quit()
821
822 def start_pin(self, *args):
823 logger.debug("start_pin")
824 bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
825 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
826 'Bssid': bssid_ay})
827 return False
828
829 def success(self):
830 return self.success_seen and self.credentials_received
831
832 with TestDbusWps(bus) as t:
833 if not t.success():
834 raise Exception("Failure in D-Bus operations")
835
836 dev[0].wait_connected(timeout=10)
837
838def test_dbus_wps_reg(dev, apdev):
839 """D-Bus WPS/Registrar operation and signals"""
840 try:
81e787b7 841 _test_dbus_wps_reg(dev, apdev)
2ec82e67
JM
842 finally:
843 dev[0].request("SET wps_cred_processing 0")
844
845def _test_dbus_wps_reg(dev, apdev):
910eb5b5 846 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
847 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
848
849 hapd = start_ap(apdev[0])
850 hapd.request("WPS_PIN any 12345670")
851 bssid = apdev[0]['bssid']
852 dev[0].scan_for_bss(bssid, freq="2412")
853 dev[0].request("SET wps_cred_processing 2")
854
855 class TestDbusWps(TestDbus):
856 def __init__(self, bus):
857 TestDbus.__init__(self, bus)
858 self.credentials_received = False
859
860 def __enter__(self):
861 gobject.timeout_add(100, self.start_reg)
862 gobject.timeout_add(15000, self.timeout)
863 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
864 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
865 "Credentials")
866 self.loop.run()
867 return self
868
869 def wpsEvent(self, name, args):
870 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
871
872 def credentials(self, args):
873 logger.debug("credentials: " + str(args))
874 self.credentials_received = True
875 self.loop.quit()
876
877 def start_reg(self, *args):
878 logger.debug("start_reg")
879 bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
880 wps.Start({'Role': 'registrar', 'Type': 'pin',
881 'Pin': '12345670', 'Bssid': bssid_ay})
882 return False
883
884 def success(self):
885 return self.credentials_received
886
887 with TestDbusWps(bus) as t:
888 if not t.success():
889 raise Exception("Failure in D-Bus operations")
890
891 dev[0].wait_connected(timeout=10)
892
2a8e3f35
JM
893def test_dbus_wps_cancel(dev, apdev):
894 """D-Bus WPS Cancel operation"""
895 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
896 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
897
898 hapd = start_ap(apdev[0])
899 bssid = apdev[0]['bssid']
900
901 wps.Cancel()
902 dev[0].scan_for_bss(bssid, freq="2412")
903 bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
904 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
905 'Bssid': bssid_ay})
906 wps.Cancel()
907 dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 1)
908
2ec82e67
JM
909def test_dbus_scan_invalid(dev, apdev):
910 """D-Bus invalid scan method"""
910eb5b5 911 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
912 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
913
914 tests = [ ({}, "InvalidArgs"),
915 ({'Type': 123}, "InvalidArgs"),
916 ({'Type': 'foo'}, "InvalidArgs"),
917 ({'Type': 'active', 'Foo': 'bar'}, "InvalidArgs"),
918 ({'Type': 'active', 'SSIDs': 'foo'}, "InvalidArgs"),
919 ({'Type': 'active', 'SSIDs': ['foo']}, "InvalidArgs"),
920 ({'Type': 'active',
921 'SSIDs': [ dbus.ByteArray("1"), dbus.ByteArray("2"),
922 dbus.ByteArray("3"), dbus.ByteArray("4"),
923 dbus.ByteArray("5"), dbus.ByteArray("6"),
924 dbus.ByteArray("7"), dbus.ByteArray("8"),
925 dbus.ByteArray("9"), dbus.ByteArray("10"),
926 dbus.ByteArray("11"), dbus.ByteArray("12"),
927 dbus.ByteArray("13"), dbus.ByteArray("14"),
928 dbus.ByteArray("15"), dbus.ByteArray("16"),
929 dbus.ByteArray("17") ]},
930 "InvalidArgs"),
931 ({'Type': 'active',
932 'SSIDs': [ dbus.ByteArray("1234567890abcdef1234567890abcdef1") ]},
933 "InvalidArgs"),
934 ({'Type': 'active', 'IEs': 'foo'}, "InvalidArgs"),
935 ({'Type': 'active', 'IEs': ['foo']}, "InvalidArgs"),
936 ({'Type': 'active', 'Channels': 2412 }, "InvalidArgs"),
937 ({'Type': 'active', 'Channels': [ 2412 ] }, "InvalidArgs"),
938 ({'Type': 'active',
939 'Channels': [ (dbus.Int32(2412), dbus.UInt32(20)) ] },
940 "InvalidArgs"),
941 ({'Type': 'active',
942 'Channels': [ (dbus.UInt32(2412), dbus.Int32(20)) ] },
943 "InvalidArgs"),
944 ({'Type': 'active', 'AllowRoam': "yes" }, "InvalidArgs"),
945 ({'Type': 'passive', 'IEs': [ dbus.ByteArray("\xdd\x00") ]},
946 "InvalidArgs"),
947 ({'Type': 'passive', 'SSIDs': [ dbus.ByteArray("foo") ]},
948 "InvalidArgs")]
949 for (t,err) in tests:
950 try:
951 iface.Scan(t)
952 raise Exception("Invalid Scan() arguments accepted: " + str(t))
953 except dbus.exceptions.DBusException, e:
954 if err not in str(e):
955 raise Exception("Unexpected error message for invalid Scan(%s): %s" % (str(t), str(e)))
956
0e126c6d
JM
957def test_dbus_scan_oom(dev, apdev):
958 """D-Bus scan method and OOM"""
959 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
960 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
961
962 with alloc_fail_dbus(dev[0], 1,
963 "wpa_scan_clone_params;wpas_dbus_handler_scan",
964 "Scan", expected="ScanError: Scan request rejected"):
965 iface.Scan({ 'Type': 'passive',
966 'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
967
968 with alloc_fail_dbus(dev[0], 1,
969 "=wpas_dbus_get_scan_channels;wpas_dbus_handler_scan",
970 "Scan"):
971 iface.Scan({ 'Type': 'passive',
972 'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
973
974 with alloc_fail_dbus(dev[0], 1,
975 "=wpas_dbus_get_scan_ies;wpas_dbus_handler_scan",
976 "Scan"):
977 iface.Scan({ 'Type': 'active',
978 'IEs': [ dbus.ByteArray("\xdd\x00") ],
979 'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
980
981 with alloc_fail_dbus(dev[0], 1,
982 "=wpas_dbus_get_scan_ssids;wpas_dbus_handler_scan",
983 "Scan"):
984 iface.Scan({ 'Type': 'active',
985 'SSIDs': [ dbus.ByteArray("open"),
986 dbus.ByteArray() ],
987 'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
988
2ec82e67
JM
989def test_dbus_scan(dev, apdev):
990 """D-Bus scan and related signals"""
910eb5b5 991 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
992 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
993
994 hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "open" })
995
996 class TestDbusScan(TestDbus):
997 def __init__(self, bus):
998 TestDbus.__init__(self, bus)
999 self.scan_completed = 0
1000 self.bss_added = False
1d0a917f 1001 self.fail_reason = None
2ec82e67
JM
1002
1003 def __enter__(self):
1004 gobject.timeout_add(1, self.run_scan)
1005 gobject.timeout_add(15000, self.timeout)
1006 self.add_signal(self.scanDone, WPAS_DBUS_IFACE, "ScanDone")
1007 self.add_signal(self.bssAdded, WPAS_DBUS_IFACE, "BSSAdded")
1008 self.add_signal(self.bssRemoved, WPAS_DBUS_IFACE, "BSSRemoved")
1009 self.loop.run()
1010 return self
1011
1012 def scanDone(self, success):
1013 logger.debug("scanDone: success=%s" % success)
1014 self.scan_completed += 1
1015 if self.scan_completed == 1:
1016 iface.Scan({'Type': 'passive',
1017 'AllowRoam': True,
1018 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
1019 elif self.scan_completed == 2:
1020 iface.Scan({'Type': 'passive',
1021 'AllowRoam': False})
1022 elif self.bss_added and self.scan_completed == 3:
1023 self.loop.quit()
1024
1025 def bssAdded(self, bss, properties):
1026 logger.debug("bssAdded: %s" % bss)
1027 logger.debug(str(properties))
1d0a917f
JM
1028 if 'WPS' in properties:
1029 if 'Type' in properties['WPS']:
1030 self.fail_reason = "Unexpected WPS dictionary entry in non-WPS BSS"
1031 self.loop.quit()
2ec82e67
JM
1032 self.bss_added = True
1033 if self.scan_completed == 3:
1034 self.loop.quit()
1035
1036 def bssRemoved(self, bss):
1037 logger.debug("bssRemoved: %s" % bss)
1038
1039 def run_scan(self, *args):
1040 logger.debug("run_scan")
1041 iface.Scan({'Type': 'active',
1042 'SSIDs': [ dbus.ByteArray("open"),
1043 dbus.ByteArray() ],
1044 'IEs': [ dbus.ByteArray("\xdd\x00"),
1045 dbus.ByteArray() ],
1046 'AllowRoam': False,
1047 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
1048 return False
1049
1050 def success(self):
1051 return self.scan_completed == 3 and self.bss_added
1052
1053 with TestDbusScan(bus) as t:
1d0a917f
JM
1054 if t.fail_reason:
1055 raise Exception(t.fail_reason)
2ec82e67
JM
1056 if not t.success():
1057 raise Exception("Expected signals not seen")
1058
1059 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSs",
1060 dbus_interface=dbus.PROPERTIES_IFACE)
1061 if len(res) < 1:
1062 raise Exception("Scan result not in BSSs property")
1063 iface.FlushBSS(0)
1064 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSs",
1065 dbus_interface=dbus.PROPERTIES_IFACE)
1066 if len(res) != 0:
1067 raise Exception("FlushBSS() did not remove scan results from BSSs property")
1068 iface.FlushBSS(1)
1069
1070def test_dbus_scan_busy(dev, apdev):
1071 """D-Bus scan trigger rejection when busy with previous scan"""
910eb5b5 1072 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1073 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1074
1075 if "OK" not in dev[0].request("SCAN freq=2412-2462"):
1076 raise Exception("Failed to start scan")
1077 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-STARTED"], 15)
1078 if ev is None:
1079 raise Exception("Scan start timed out")
1080
1081 try:
1082 iface.Scan({'Type': 'active', 'AllowRoam': False})
1083 raise Exception("Scan() accepted when busy")
1084 except dbus.exceptions.DBusException, e:
1085 if "ScanError: Scan request reject" not in str(e):
1086 raise Exception("Unexpected error message: " + str(e))
1087
1088 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
1089 if ev is None:
1090 raise Exception("Scan timed out")
1091
1092def test_dbus_connect(dev, apdev):
1093 """D-Bus AddNetwork and connect"""
910eb5b5 1094 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1095 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1096
1097 ssid = "test-wpa2-psk"
1098 passphrase = 'qwertyuiop'
1099 params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
1100 hapd = hostapd.add_ap(apdev[0]['ifname'], params)
1101
1102 class TestDbusConnect(TestDbus):
1103 def __init__(self, bus):
1104 TestDbus.__init__(self, bus)
1105 self.network_added = False
1106 self.network_selected = False
1107 self.network_removed = False
1108 self.state = 0
1109
1110 def __enter__(self):
1111 gobject.timeout_add(1, self.run_connect)
1112 gobject.timeout_add(15000, self.timeout)
1113 self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
1114 self.add_signal(self.networkRemoved, WPAS_DBUS_IFACE,
1115 "NetworkRemoved")
1116 self.add_signal(self.networkSelected, WPAS_DBUS_IFACE,
1117 "NetworkSelected")
1118 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1119 "PropertiesChanged")
1120 self.loop.run()
1121 return self
1122
1123 def networkAdded(self, network, properties):
1124 logger.debug("networkAdded: %s" % str(network))
1125 logger.debug(str(properties))
1126 self.network_added = True
1127
1128 def networkRemoved(self, network):
1129 logger.debug("networkRemoved: %s" % str(network))
1130 self.network_removed = True
1131
1132 def networkSelected(self, network):
1133 logger.debug("networkSelected: %s" % str(network))
1134 self.network_selected = True
1135
1136 def propertiesChanged(self, properties):
1137 logger.debug("propertiesChanged: %s" % str(properties))
1138 if 'State' in properties and properties['State'] == "completed":
1139 if self.state == 0:
1140 self.state = 1
1141 iface.Disconnect()
1142 elif self.state == 2:
1143 self.state = 3
1144 iface.Disconnect()
1145 elif self.state == 4:
1146 self.state = 5
1147 iface.Reattach()
1148 elif self.state == 5:
1149 self.state = 6
13b34972
JM
1150 iface.Disconnect()
1151 elif self.state == 7:
1152 self.state = 8
2ec82e67
JM
1153 res = iface.SignalPoll()
1154 logger.debug("SignalPoll: " + str(res))
1155 if 'frequency' not in res or res['frequency'] != 2412:
1156 self.state = -1
1157 logger.info("Unexpected SignalPoll result")
1158 iface.RemoveNetwork(self.netw)
1159 if 'State' in properties and properties['State'] == "disconnected":
1160 if self.state == 1:
1161 self.state = 2
1162 iface.SelectNetwork(self.netw)
1163 elif self.state == 3:
1164 self.state = 4
1165 iface.Reassociate()
1166 elif self.state == 6:
1167 self.state = 7
13b34972
JM
1168 iface.Reconnect()
1169 elif self.state == 8:
1170 self.state = 9
2ec82e67
JM
1171 self.loop.quit()
1172
1173 def run_connect(self, *args):
1174 logger.debug("run_connect")
1175 args = dbus.Dictionary({ 'ssid': ssid,
1176 'key_mgmt': 'WPA-PSK',
1177 'psk': passphrase,
1178 'scan_freq': 2412 },
1179 signature='sv')
1180 self.netw = iface.AddNetwork(args)
1181 iface.SelectNetwork(self.netw)
1182 return False
1183
1184 def success(self):
1185 if not self.network_added or \
1186 not self.network_removed or \
1187 not self.network_selected:
1188 return False
13b34972 1189 return self.state == 9
2ec82e67
JM
1190
1191 with TestDbusConnect(bus) as t:
1192 if not t.success():
1193 raise Exception("Expected signals not seen")
1194
53f4ed68
JM
1195def test_dbus_connect_psk_mem(dev, apdev):
1196 """D-Bus AddNetwork and connect with memory-only PSK"""
1197 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1198 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1199
1200 ssid = "test-wpa2-psk"
1201 passphrase = 'qwertyuiop'
1202 params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
1203 hapd = hostapd.add_ap(apdev[0]['ifname'], params)
1204
1205 class TestDbusConnect(TestDbus):
1206 def __init__(self, bus):
1207 TestDbus.__init__(self, bus)
1208 self.connected = False
1209
1210 def __enter__(self):
1211 gobject.timeout_add(1, self.run_connect)
1212 gobject.timeout_add(15000, self.timeout)
1213 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1214 "PropertiesChanged")
1215 self.add_signal(self.networkRequest, WPAS_DBUS_IFACE,
1216 "NetworkRequest")
1217 self.loop.run()
1218 return self
1219
1220 def propertiesChanged(self, properties):
1221 logger.debug("propertiesChanged: %s" % str(properties))
1222 if 'State' in properties and properties['State'] == "completed":
1223 self.connected = True
1224 self.loop.quit()
1225
1226 def networkRequest(self, path, field, txt):
1227 logger.debug("networkRequest: %s %s %s" % (path, field, txt))
1228 if field == "PSK_PASSPHRASE":
1229 iface.NetworkReply(path, field, '"' + passphrase + '"')
1230
1231 def run_connect(self, *args):
1232 logger.debug("run_connect")
1233 args = dbus.Dictionary({ 'ssid': ssid,
1234 'key_mgmt': 'WPA-PSK',
1235 'mem_only_psk': 1,
1236 'scan_freq': 2412 },
1237 signature='sv')
1238 self.netw = iface.AddNetwork(args)
1239 iface.SelectNetwork(self.netw)
1240 return False
1241
1242 def success(self):
1243 return self.connected
1244
1245 with TestDbusConnect(bus) as t:
1246 if not t.success():
1247 raise Exception("Expected signals not seen")
1248
0e126c6d
JM
1249def test_dbus_connect_oom(dev, apdev):
1250 """D-Bus AddNetwork and connect when out-of-memory"""
1251 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1252 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1253
1254 if "OK" not in dev[0].request("TEST_ALLOC_FAIL 0:"):
1255 raise HwsimSkip("TEST_ALLOC_FAIL not supported in the build")
1256
1257 ssid = "test-wpa2-psk"
1258 passphrase = 'qwertyuiop'
1259 params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
1260 hapd = hostapd.add_ap(apdev[0]['ifname'], params)
1261
1262 class TestDbusConnect(TestDbus):
1263 def __init__(self, bus):
1264 TestDbus.__init__(self, bus)
1265 self.network_added = False
1266 self.network_selected = False
1267 self.network_removed = False
1268 self.state = 0
1269
1270 def __enter__(self):
1271 gobject.timeout_add(1, self.run_connect)
1272 gobject.timeout_add(1500, self.timeout)
1273 self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
1274 self.add_signal(self.networkRemoved, WPAS_DBUS_IFACE,
1275 "NetworkRemoved")
1276 self.add_signal(self.networkSelected, WPAS_DBUS_IFACE,
1277 "NetworkSelected")
1278 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1279 "PropertiesChanged")
1280 self.loop.run()
1281 return self
1282
1283 def networkAdded(self, network, properties):
1284 logger.debug("networkAdded: %s" % str(network))
1285 logger.debug(str(properties))
1286 self.network_added = True
1287
1288 def networkRemoved(self, network):
1289 logger.debug("networkRemoved: %s" % str(network))
1290 self.network_removed = True
1291
1292 def networkSelected(self, network):
1293 logger.debug("networkSelected: %s" % str(network))
1294 self.network_selected = True
1295
1296 def propertiesChanged(self, properties):
1297 logger.debug("propertiesChanged: %s" % str(properties))
1298 if 'State' in properties and properties['State'] == "completed":
1299 if self.state == 0:
1300 self.state = 1
1301 iface.Disconnect()
1302 elif self.state == 2:
1303 self.state = 3
1304 iface.Disconnect()
1305 elif self.state == 4:
1306 self.state = 5
1307 iface.Reattach()
1308 elif self.state == 5:
1309 self.state = 6
1310 res = iface.SignalPoll()
1311 logger.debug("SignalPoll: " + str(res))
1312 if 'frequency' not in res or res['frequency'] != 2412:
1313 self.state = -1
1314 logger.info("Unexpected SignalPoll result")
1315 iface.RemoveNetwork(self.netw)
1316 if 'State' in properties and properties['State'] == "disconnected":
1317 if self.state == 1:
1318 self.state = 2
1319 iface.SelectNetwork(self.netw)
1320 elif self.state == 3:
1321 self.state = 4
1322 iface.Reassociate()
1323 elif self.state == 6:
1324 self.state = 7
1325 self.loop.quit()
1326
1327 def run_connect(self, *args):
1328 logger.debug("run_connect")
1329 args = dbus.Dictionary({ 'ssid': ssid,
1330 'key_mgmt': 'WPA-PSK',
1331 'psk': passphrase,
1332 'scan_freq': 2412 },
1333 signature='sv')
1334 try:
1335 self.netw = iface.AddNetwork(args)
1336 except Exception, e:
1337 logger.info("Exception on AddNetwork: " + str(e))
1338 self.loop.quit()
1339 return False
1340 try:
1341 iface.SelectNetwork(self.netw)
1342 except Exception, e:
1343 logger.info("Exception on SelectNetwork: " + str(e))
1344 self.loop.quit()
1345
1346 return False
1347
1348 def success(self):
1349 if not self.network_added or \
1350 not self.network_removed or \
1351 not self.network_selected:
1352 return False
1353 return self.state == 7
1354
1355 count = 0
1356 for i in range(1, 1000):
1357 for j in range(3):
1358 dev[j].dump_monitor()
1359 dev[0].request("TEST_ALLOC_FAIL %d:main" % i)
1360 try:
1361 with TestDbusConnect(bus) as t:
1362 if not t.success():
1363 logger.info("Iteration %d - Expected signals not seen" % i)
1364 else:
1365 logger.info("Iteration %d - success" % i)
1366
1367 state = dev[0].request('GET_ALLOC_FAIL')
1368 logger.info("GET_ALLOC_FAIL: " + state)
1369 dev[0].dump_monitor()
1370 dev[0].request("TEST_ALLOC_FAIL 0:")
1371 if i < 3:
1372 raise Exception("Connection succeeded during out-of-memory")
1373 if not state.startswith('0:'):
1374 count += 1
1375 if count == 5:
1376 break
1377 except:
1378 pass
2ec82e67 1379
89a79ca2
JM
1380 # Force regulatory update to re-fetch hw capabilities for the following
1381 # test cases.
1382 try:
1383 dev[0].dump_monitor()
1384 subprocess.call(['iw', 'reg', 'set', 'US'])
1385 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
1386 finally:
1387 dev[0].dump_monitor()
1388 subprocess.call(['iw', 'reg', 'set', '00'])
1389 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
1390
2ec82e67
JM
1391def test_dbus_while_not_connected(dev, apdev):
1392 """D-Bus invalid operations while not connected"""
910eb5b5 1393 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1394 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1395
1396 try:
1397 iface.Disconnect()
1398 raise Exception("Disconnect() accepted when not connected")
1399 except dbus.exceptions.DBusException, e:
1400 if "NotConnected" not in str(e):
1401 raise Exception("Unexpected error message for invalid Disconnect: " + str(e))
1402
1403 try:
1404 iface.Reattach()
1405 raise Exception("Reattach() accepted when not connected")
1406 except dbus.exceptions.DBusException, e:
1407 if "NotConnected" not in str(e):
1408 raise Exception("Unexpected error message for invalid Reattach: " + str(e))
1409
1410def test_dbus_connect_eap(dev, apdev):
1411 """D-Bus AddNetwork and connect to EAP network"""
089e7ca3 1412 check_altsubject_match_support(dev[0])
910eb5b5 1413 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1414 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1415
1416 ssid = "ieee8021x-open"
1417 params = hostapd.radius_params()
1418 params["ssid"] = ssid
1419 params["ieee8021x"] = "1"
1420 hapd = hostapd.add_ap(apdev[0]['ifname'], params)
1421
1422 class TestDbusConnect(TestDbus):
1423 def __init__(self, bus):
1424 TestDbus.__init__(self, bus)
1425 self.certification_received = False
1426 self.eap_status = False
1427 self.state = 0
1428
1429 def __enter__(self):
1430 gobject.timeout_add(1, self.run_connect)
1431 gobject.timeout_add(15000, self.timeout)
1432 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1433 "PropertiesChanged")
1434 self.add_signal(self.certification, WPAS_DBUS_IFACE,
2099fed4 1435 "Certification", byte_arrays=True)
2ec82e67
JM
1436 self.add_signal(self.networkRequest, WPAS_DBUS_IFACE,
1437 "NetworkRequest")
1438 self.add_signal(self.eap, WPAS_DBUS_IFACE, "EAP")
1439 self.loop.run()
1440 return self
1441
1442 def propertiesChanged(self, properties):
1443 logger.debug("propertiesChanged: %s" % str(properties))
1444 if 'State' in properties and properties['State'] == "completed":
1445 if self.state == 0:
1446 self.state = 1
1447 iface.EAPLogoff()
2099fed4
JM
1448 logger.info("Set dNSName constraint")
1449 net_obj = bus.get_object(WPAS_DBUS_SERVICE, self.netw)
1450 args = dbus.Dictionary({ 'altsubject_match':
1451 self.server_dnsname },
1452 signature='sv')
1453 net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args,
1454 dbus_interface=dbus.PROPERTIES_IFACE)
2ec82e67
JM
1455 elif self.state == 2:
1456 self.state = 3
2099fed4
JM
1457 iface.Disconnect()
1458 logger.info("Set non-matching dNSName constraint")
1459 net_obj = bus.get_object(WPAS_DBUS_SERVICE, self.netw)
1460 args = dbus.Dictionary({ 'altsubject_match':
1461 self.server_dnsname + "FOO" },
1462 signature='sv')
1463 net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args,
1464 dbus_interface=dbus.PROPERTIES_IFACE)
2ec82e67
JM
1465 if 'State' in properties and properties['State'] == "disconnected":
1466 if self.state == 1:
1467 self.state = 2
1468 iface.EAPLogon()
1469 iface.SelectNetwork(self.netw)
2099fed4
JM
1470 if self.state == 3:
1471 self.state = 4
1472 iface.SelectNetwork(self.netw)
2ec82e67
JM
1473
1474 def certification(self, args):
1475 logger.debug("certification: %s" % str(args))
1476 self.certification_received = True
2099fed4
JM
1477 if args['depth'] == 0:
1478 # The test server certificate is supposed to have dNSName
1479 if len(args['altsubject']) < 1:
1480 raise Exception("Missing dNSName")
1481 dnsname = args['altsubject'][0]
1482 if not dnsname.startswith("DNS:"):
1483 raise Exception("Expected dNSName not found: " + dnsname)
1484 logger.info("altsubject: " + dnsname)
1485 self.server_dnsname = dnsname
2ec82e67
JM
1486
1487 def eap(self, status, parameter):
1488 logger.debug("EAP: status=%s parameter=%s" % (status, parameter))
1489 if status == 'completion' and parameter == 'success':
1490 self.eap_status = True
2099fed4
JM
1491 if self.state == 4 and status == 'remote certificate verification' and parameter == 'AltSubject mismatch':
1492 self.state = 5
1493 self.loop.quit()
2ec82e67
JM
1494
1495 def networkRequest(self, path, field, txt):
1496 logger.debug("networkRequest: %s %s %s" % (path, field, txt))
1497 if field == "PASSWORD":
1498 iface.NetworkReply(path, field, "password")
1499
1500 def run_connect(self, *args):
1501 logger.debug("run_connect")
1502 args = dbus.Dictionary({ 'ssid': ssid,
1503 'key_mgmt': 'IEEE8021X',
1504 'eapol_flags': 0,
1505 'eap': 'TTLS',
1506 'anonymous_identity': 'ttls',
1507 'identity': 'pap user',
1508 'ca_cert': 'auth_serv/ca.pem',
1509 'phase2': 'auth=PAP',
1510 'scan_freq': 2412 },
1511 signature='sv')
1512 self.netw = iface.AddNetwork(args)
1513 iface.SelectNetwork(self.netw)
1514 return False
1515
1516 def success(self):
1517 if not self.eap_status or not self.certification_received:
1518 return False
2099fed4 1519 return self.state == 5
2ec82e67
JM
1520
1521 with TestDbusConnect(bus) as t:
1522 if not t.success():
1523 raise Exception("Expected signals not seen")
1524
1525def test_dbus_network(dev, apdev):
1526 """D-Bus AddNetwork/RemoveNetwork parameters and error cases"""
910eb5b5 1527 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1528 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1529
1530 args = dbus.Dictionary({ 'ssid': "foo",
1531 'key_mgmt': 'WPA-PSK',
1532 'psk': "12345678",
1533 'identity': dbus.ByteArray([ 1, 2 ]),
1534 'priority': dbus.Int32(0),
1535 'scan_freq': dbus.UInt32(2412) },
1536 signature='sv')
1537 netw = iface.AddNetwork(args)
5a233fbd
JM
1538 id = int(dev[0].list_networks()[0]['id'])
1539 val = dev[0].get_network(id, "scan_freq")
1540 if val != "2412":
1541 raise Exception("Invalid scan_freq value: " + str(val))
1542 iface.RemoveNetwork(netw)
1543
1544 args = dbus.Dictionary({ 'ssid': "foo",
1545 'key_mgmt': 'NONE',
1546 'scan_freq': "2412 2432",
1547 'freq_list': "2412 2417 2432" },
1548 signature='sv')
1549 netw = iface.AddNetwork(args)
1550 id = int(dev[0].list_networks()[0]['id'])
1551 val = dev[0].get_network(id, "scan_freq")
1552 if val != "2412 2432":
1553 raise Exception("Invalid scan_freq value (2): " + str(val))
1554 val = dev[0].get_network(id, "freq_list")
1555 if val != "2412 2417 2432":
1556 raise Exception("Invalid freq_list value: " + str(val))
2ec82e67
JM
1557 iface.RemoveNetwork(netw)
1558 try:
1559 iface.RemoveNetwork(netw)
1560 raise Exception("Invalid RemoveNetwork() accepted")
1561 except dbus.exceptions.DBusException, e:
1562 if "NetworkUnknown" not in str(e):
1563 raise Exception("Unexpected error message for invalid RemoveNetwork: " + str(e))
1564 try:
1565 iface.SelectNetwork(netw)
1566 raise Exception("Invalid SelectNetwork() accepted")
1567 except dbus.exceptions.DBusException, e:
1568 if "NetworkUnknown" not in str(e):
1569 raise Exception("Unexpected error message for invalid RemoveNetwork: " + str(e))
1570
1571 args = dbus.Dictionary({ 'ssid': "foo1", 'key_mgmt': 'NONE',
1572 'identity': "testuser", 'scan_freq': '2412' },
1573 signature='sv')
1574 netw1 = iface.AddNetwork(args)
1575 args = dbus.Dictionary({ 'ssid': "foo2", 'key_mgmt': 'NONE' },
1576 signature='sv')
1577 netw2 = iface.AddNetwork(args)
1578 res = if_obj.Get(WPAS_DBUS_IFACE, "Networks",
1579 dbus_interface=dbus.PROPERTIES_IFACE)
1580 if len(res) != 2:
1581 raise Exception("Unexpected number of networks")
1582
1583 net_obj = bus.get_object(WPAS_DBUS_SERVICE, netw1)
1584 res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled",
1585 dbus_interface=dbus.PROPERTIES_IFACE)
1586 if res != False:
1587 raise Exception("Added network was unexpectedly enabled by default")
1588 net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.Boolean(True),
1589 dbus_interface=dbus.PROPERTIES_IFACE)
1590 res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled",
1591 dbus_interface=dbus.PROPERTIES_IFACE)
1592 if res != True:
1593 raise Exception("Set(Enabled,True) did not seem to change property value")
1594 net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.Boolean(False),
1595 dbus_interface=dbus.PROPERTIES_IFACE)
1596 res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled",
1597 dbus_interface=dbus.PROPERTIES_IFACE)
1598 if res != False:
1599 raise Exception("Set(Enabled,False) did not seem to change property value")
1600 try:
1601 net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.UInt32(1),
1602 dbus_interface=dbus.PROPERTIES_IFACE)
1603 raise Exception("Invalid Set(Enabled,1) accepted")
1604 except dbus.exceptions.DBusException, e:
1605 if "Error.Failed: wrong property type" not in str(e):
1606 raise Exception("Unexpected error message for invalid Set(Enabled,1): " + str(e))
1607
1608 args = dbus.Dictionary({ 'ssid': "foo1new" }, signature='sv')
1609 net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args,
1610 dbus_interface=dbus.PROPERTIES_IFACE)
1611 res = net_obj.Get(WPAS_DBUS_NETWORK, "Properties",
1612 dbus_interface=dbus.PROPERTIES_IFACE)
1613 if res['ssid'] != '"foo1new"':
1614 raise Exception("Set(Properties) failed to update ssid")
1615 if res['identity'] != '"testuser"':
1616 raise Exception("Set(Properties) unexpectedly changed unrelated parameter")
1617
1618 iface.RemoveAllNetworks()
1619 res = if_obj.Get(WPAS_DBUS_IFACE, "Networks",
1620 dbus_interface=dbus.PROPERTIES_IFACE)
1621 if len(res) != 0:
1622 raise Exception("Unexpected number of networks")
1623 iface.RemoveAllNetworks()
1624
1625 tests = [ dbus.Dictionary({ 'psk': "1234567" }, signature='sv'),
1626 dbus.Dictionary({ 'identity': dbus.ByteArray() },
1627 signature='sv'),
1628 dbus.Dictionary({ 'identity': dbus.Byte(1) }, signature='sv'),
1629 dbus.Dictionary({ 'identity': "" }, signature='sv') ]
1630 for args in tests:
1631 try:
1632 iface.AddNetwork(args)
1633 raise Exception("Invalid AddNetwork args accepted: " + str(args))
1634 except dbus.exceptions.DBusException, e:
1635 if "InvalidArgs" not in str(e):
1636 raise Exception("Unexpected error message for invalid AddNetwork: " + str(e))
1637
0e126c6d
JM
1638def test_dbus_network_oom(dev, apdev):
1639 """D-Bus AddNetwork/RemoveNetwork parameters and OOM error cases"""
1640 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1641 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1642
1643 args = dbus.Dictionary({ 'ssid': "foo1", 'key_mgmt': 'NONE',
1644 'identity': "testuser", 'scan_freq': '2412' },
1645 signature='sv')
1646 netw1 = iface.AddNetwork(args)
1647 net_obj = bus.get_object(WPAS_DBUS_SERVICE, netw1)
1648
1649 with alloc_fail_dbus(dev[0], 1,
1650 "wpa_config_get_all;wpas_dbus_getter_network_properties",
1651 "Get"):
1652 net_obj.Get(WPAS_DBUS_NETWORK, "Properties",
1653 dbus_interface=dbus.PROPERTIES_IFACE)
1654
1655 iface.RemoveAllNetworks()
1656
1657 with alloc_fail_dbus(dev[0], 1,
1658 "wpas_dbus_new_decompose_object_path;wpas_dbus_handler_remove_network",
1659 "RemoveNetwork", "InvalidArgs"):
1660 iface.RemoveNetwork(dbus.ObjectPath("/fi/w1/wpa_supplicant1/Interfaces/1234/Networks/1234"))
1661
1662 with alloc_fail(dev[0], 1, "wpa_dbus_register_object_per_iface;wpas_dbus_register_network"):
1663 args = dbus.Dictionary({ 'ssid': "foo2", 'key_mgmt': 'NONE' },
1664 signature='sv')
1665 try:
1666 netw = iface.AddNetwork(args)
1667 # Currently, AddNetwork() succeeds even if os_strdup() for path
1668 # fails, so remove the network if that occurs.
1669 iface.RemoveNetwork(netw)
1670 except dbus.exceptions.DBusException, e:
1671 pass
1672
1673 for i in range(1, 3):
1674 with alloc_fail(dev[0], i, "=wpas_dbus_register_network"):
1675 try:
1676 netw = iface.AddNetwork(args)
1677 # Currently, AddNetwork() succeeds even if network registration
1678 # fails, so remove the network if that occurs.
1679 iface.RemoveNetwork(netw)
1680 except dbus.exceptions.DBusException, e:
1681 pass
1682
1683 with alloc_fail_dbus(dev[0], 1,
1684 "=wpa_config_add_network;wpas_dbus_handler_add_network",
1685 "AddNetwork",
1686 "UnknownError: wpa_supplicant could not add a network"):
1687 args = dbus.Dictionary({ 'ssid': "foo2", 'key_mgmt': 'NONE' },
1688 signature='sv')
1689 netw = iface.AddNetwork(args)
1690
1691 tests = [ (1,
1692 'wpa_dbus_dict_get_entry;set_network_properties;wpas_dbus_handler_add_network',
1693 dbus.Dictionary({ 'ssid': dbus.ByteArray(' ') },
1694 signature='sv')),
1695 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1696 dbus.Dictionary({ 'ssid': 'foo' }, signature='sv')),
1697 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1698 dbus.Dictionary({ 'eap': 'foo' }, signature='sv')),
1699 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1700 dbus.Dictionary({ 'priority': dbus.UInt32(1) },
1701 signature='sv')),
1702 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1703 dbus.Dictionary({ 'priority': dbus.Int32(1) },
1704 signature='sv')),
1705 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1706 dbus.Dictionary({ 'ssid': dbus.ByteArray(' ') },
1707 signature='sv')) ]
1708 for (count,funcs,args) in tests:
1709 with alloc_fail_dbus(dev[0], count, funcs, "AddNetwork", "InvalidArgs"):
1710 netw = iface.AddNetwork(args)
1711
1712 if len(if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
1713 dbus_interface=dbus.PROPERTIES_IFACE)) > 0:
1714 raise Exception("Unexpected network block added")
1715 if len(dev[0].list_networks()) > 0:
1716 raise Exception("Unexpected network block visible")
1717
2ec82e67
JM
1718def test_dbus_interface(dev, apdev):
1719 """D-Bus CreateInterface/GetInterface/RemoveInterface parameters and error cases"""
7966674d
JM
1720 try:
1721 _test_dbus_interface(dev, apdev)
1722 finally:
1723 # Need to force P2P channel list update since the 'lo' interface
1724 # with driver=none ends up configuring default dualband channels.
1725 dev[0].request("SET country US")
1726 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
1727 if ev is None:
1728 ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"],
1729 timeout=1)
1730 dev[0].request("SET country 00")
1731 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
1732 if ev is None:
1733 ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"],
1734 timeout=1)
1735 subprocess.call(['iw', 'reg', 'set', '00'])
1736
1737def _test_dbus_interface(dev, apdev):
910eb5b5 1738 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1739 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
1740
1741 params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none' },
1742 signature='sv')
1743 path = wpas.CreateInterface(params)
1744 logger.debug("New interface path: " + str(path))
1745 path2 = wpas.GetInterface("lo")
1746 if path != path2:
1747 raise Exception("Interface object mismatch")
1748
1749 params = dbus.Dictionary({ 'Ifname': 'lo',
1750 'Driver': 'none',
1751 'ConfigFile': 'foo',
1752 'BridgeIfname': 'foo', },
1753 signature='sv')
1754 try:
1755 wpas.CreateInterface(params)
1756 raise Exception("Invalid CreateInterface() accepted")
1757 except dbus.exceptions.DBusException, e:
1758 if "InterfaceExists" not in str(e):
1759 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e))
1760
1761 wpas.RemoveInterface(path)
1762 try:
1763 wpas.RemoveInterface(path)
1764 raise Exception("Invalid RemoveInterface() accepted")
1765 except dbus.exceptions.DBusException, e:
1766 if "InterfaceUnknown" not in str(e):
1767 raise Exception("Unexpected error message for invalid RemoveInterface: " + str(e))
1768
1769 params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none',
1770 'Foo': 123 },
1771 signature='sv')
1772 try:
1773 wpas.CreateInterface(params)
1774 raise Exception("Invalid CreateInterface() accepted")
1775 except dbus.exceptions.DBusException, e:
1776 if "InvalidArgs" not in str(e):
1777 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e))
1778
1779 params = dbus.Dictionary({ 'Driver': 'none' }, signature='sv')
1780 try:
1781 wpas.CreateInterface(params)
1782 raise Exception("Invalid CreateInterface() accepted")
1783 except dbus.exceptions.DBusException, e:
1784 if "InvalidArgs" not in str(e):
1785 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e))
1786
1787 try:
1788 wpas.GetInterface("lo")
1789 raise Exception("Invalid GetInterface() accepted")
1790 except dbus.exceptions.DBusException, e:
1791 if "InterfaceUnknown" not in str(e):
1792 raise Exception("Unexpected error message for invalid RemoveInterface: " + str(e))
1793
0e126c6d
JM
1794def test_dbus_interface_oom(dev, apdev):
1795 """D-Bus CreateInterface/GetInterface/RemoveInterface OOM error cases"""
1796 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1797 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
1798
1799 with alloc_fail_dbus(dev[0], 1, "wpa_dbus_dict_get_entry;wpas_dbus_handler_create_interface", "CreateInterface", "InvalidArgs"):
1800 params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none' },
1801 signature='sv')
1802 wpas.CreateInterface(params)
1803
1804 for i in range(1, 1000):
1805 dev[0].request("TEST_ALLOC_FAIL %d:wpa_supplicant_add_iface;wpas_dbus_handler_create_interface" % i)
1806 params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none' },
1807 signature='sv')
1808 try:
1809 npath = wpas.CreateInterface(params)
1810 wpas.RemoveInterface(npath)
1811 logger.info("CreateInterface succeeds after %d allocation failures" % i)
1812 state = dev[0].request('GET_ALLOC_FAIL')
1813 logger.info("GET_ALLOC_FAIL: " + state)
1814 dev[0].dump_monitor()
1815 dev[0].request("TEST_ALLOC_FAIL 0:")
1816 if i < 5:
1817 raise Exception("CreateInterface succeeded during out-of-memory")
1818 if not state.startswith('0:'):
1819 break
1820 except dbus.exceptions.DBusException, e:
1821 pass
1822
1823 for arg in [ 'Driver', 'Ifname', 'ConfigFile', 'BridgeIfname' ]:
1824 with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_handler_create_interface",
1825 "CreateInterface"):
1826 params = dbus.Dictionary({ arg: 'foo' }, signature='sv')
1827 wpas.CreateInterface(params)
1828
2ec82e67
JM
1829def test_dbus_blob(dev, apdev):
1830 """D-Bus AddNetwork/RemoveNetwork parameters and error cases"""
910eb5b5 1831 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1832 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1833
1834 blob = dbus.ByteArray("\x01\x02\x03")
1835 iface.AddBlob('blob1', blob)
1836 try:
1837 iface.AddBlob('blob1', dbus.ByteArray("\x01\x02\x04"))
1838 raise Exception("Invalid AddBlob() accepted")
1839 except dbus.exceptions.DBusException, e:
1840 if "BlobExists" not in str(e):
1841 raise Exception("Unexpected error message for invalid AddBlob: " + str(e))
1842 res = iface.GetBlob('blob1')
1843 if len(res) != len(blob):
1844 raise Exception("Unexpected blob data length")
1845 for i in range(len(res)):
1846 if res[i] != dbus.Byte(blob[i]):
1847 raise Exception("Unexpected blob data")
1848 res = if_obj.Get(WPAS_DBUS_IFACE, "Blobs",
1849 dbus_interface=dbus.PROPERTIES_IFACE)
1850 if 'blob1' not in res:
1851 raise Exception("Added blob missing from Blobs property")
1852 iface.RemoveBlob('blob1')
1853 try:
1854 iface.RemoveBlob('blob1')
1855 raise Exception("Invalid RemoveBlob() accepted")
1856 except dbus.exceptions.DBusException, e:
1857 if "BlobUnknown" not in str(e):
1858 raise Exception("Unexpected error message for invalid RemoveBlob: " + str(e))
1859 try:
1860 iface.GetBlob('blob1')
1861 raise Exception("Invalid GetBlob() accepted")
1862 except dbus.exceptions.DBusException, e:
1863 if "BlobUnknown" not in str(e):
1864 raise Exception("Unexpected error message for invalid GetBlob: " + str(e))
1865
1866 class TestDbusBlob(TestDbus):
1867 def __init__(self, bus):
1868 TestDbus.__init__(self, bus)
1869 self.blob_added = False
1870 self.blob_removed = False
1871
1872 def __enter__(self):
1873 gobject.timeout_add(1, self.run_blob)
1874 gobject.timeout_add(15000, self.timeout)
1875 self.add_signal(self.blobAdded, WPAS_DBUS_IFACE, "BlobAdded")
1876 self.add_signal(self.blobRemoved, WPAS_DBUS_IFACE, "BlobRemoved")
1877 self.loop.run()
1878 return self
1879
1880 def blobAdded(self, blobName):
1881 logger.debug("blobAdded: %s" % blobName)
1882 if blobName == 'blob2':
1883 self.blob_added = True
1884
1885 def blobRemoved(self, blobName):
1886 logger.debug("blobRemoved: %s" % blobName)
1887 if blobName == 'blob2':
1888 self.blob_removed = True
1889 self.loop.quit()
1890
1891 def run_blob(self, *args):
1892 logger.debug("run_blob")
1893 iface.AddBlob('blob2', dbus.ByteArray("\x01\x02\x04"))
1894 iface.RemoveBlob('blob2')
1895 return False
1896
1897 def success(self):
1898 return self.blob_added and self.blob_removed
1899
1900 with TestDbusBlob(bus) as t:
1901 if not t.success():
1902 raise Exception("Expected signals not seen")
1903
0e126c6d
JM
1904def test_dbus_blob_oom(dev, apdev):
1905 """D-Bus AddNetwork/RemoveNetwork OOM error cases"""
1906 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1907 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1908
1909 for i in range(1, 4):
1910 with alloc_fail_dbus(dev[0], i, "wpas_dbus_handler_add_blob",
1911 "AddBlob"):
1912 iface.AddBlob('blob_no_mem', dbus.ByteArray("\x01\x02\x03\x04"))
1913
2ec82e67
JM
1914def test_dbus_autoscan(dev, apdev):
1915 """D-Bus Autoscan()"""
910eb5b5 1916 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1917 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1918
1919 iface.AutoScan("foo")
1920 iface.AutoScan("periodic:1")
1921 iface.AutoScan("")
1922 dev[0].request("AUTOSCAN ")
1923
0e126c6d
JM
1924def test_dbus_autoscan_oom(dev, apdev):
1925 """D-Bus Autoscan() OOM"""
1926 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1927 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1928
1929 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_handler_autoscan", "AutoScan"):
1930 iface.AutoScan("foo")
1931 dev[0].request("AUTOSCAN ")
1932
2ec82e67
JM
1933def test_dbus_tdls_invalid(dev, apdev):
1934 """D-Bus invalid TDLS operations"""
910eb5b5 1935 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1936 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1937
1938 hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "test-open" })
1939 connect_2sta_open(dev, hapd)
1940 addr1 = dev[1].p2p_interface_addr()
1941
1942 try:
1943 iface.TDLSDiscover("foo")
1944 raise Exception("Invalid TDLSDiscover() accepted")
1945 except dbus.exceptions.DBusException, e:
1946 if "InvalidArgs" not in str(e):
1947 raise Exception("Unexpected error message for invalid TDLSDiscover: " + str(e))
1948
1949 try:
1950 iface.TDLSStatus("foo")
1951 raise Exception("Invalid TDLSStatus() accepted")
1952 except dbus.exceptions.DBusException, e:
1953 if "InvalidArgs" not in str(e):
1954 raise Exception("Unexpected error message for invalid TDLSStatus: " + str(e))
1955
1956 res = iface.TDLSStatus(addr1)
1957 if res != "peer does not exist":
1958 raise Exception("Unexpected TDLSStatus response")
1959
1960 try:
1961 iface.TDLSSetup("foo")
1962 raise Exception("Invalid TDLSSetup() accepted")
1963 except dbus.exceptions.DBusException, e:
1964 if "InvalidArgs" not in str(e):
1965 raise Exception("Unexpected error message for invalid TDLSSetup: " + str(e))
1966
1967 try:
1968 iface.TDLSTeardown("foo")
1969 raise Exception("Invalid TDLSTeardown() accepted")
1970 except dbus.exceptions.DBusException, e:
1971 if "InvalidArgs" not in str(e):
1972 raise Exception("Unexpected error message for invalid TDLSTeardown: " + str(e))
1973
795b6f57
JM
1974 try:
1975 iface.TDLSTeardown("00:11:22:33:44:55")
1976 raise Exception("TDLSTeardown accepted for unknown peer")
1977 except dbus.exceptions.DBusException, e:
1978 if "UnknownError: error performing TDLS teardown" not in str(e):
1979 raise Exception("Unexpected error message: " + str(e))
1980
0e126c6d
JM
1981def test_dbus_tdls_oom(dev, apdev):
1982 """D-Bus TDLS operations during OOM"""
1983 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1984 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1985
1986 with alloc_fail_dbus(dev[0], 1, "wpa_tdls_add_peer", "TDLSSetup",
1987 "UnknownError: error performing TDLS setup"):
1988 iface.TDLSSetup("00:11:22:33:44:55")
1989
2ec82e67
JM
1990def test_dbus_tdls(dev, apdev):
1991 """D-Bus TDLS"""
910eb5b5 1992 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1993 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1994
1995 hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "test-open" })
1996 connect_2sta_open(dev, hapd)
1997
1998 addr1 = dev[1].p2p_interface_addr()
1999
2000 class TestDbusTdls(TestDbus):
2001 def __init__(self, bus):
2002 TestDbus.__init__(self, bus)
2003 self.tdls_setup = False
2004 self.tdls_teardown = False
2005
2006 def __enter__(self):
2007 gobject.timeout_add(1, self.run_tdls)
2008 gobject.timeout_add(15000, self.timeout)
2009 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
2010 "PropertiesChanged")
2011 self.loop.run()
2012 return self
2013
2014 def propertiesChanged(self, properties):
2015 logger.debug("propertiesChanged: %s" % str(properties))
2016
2017 def run_tdls(self, *args):
2018 logger.debug("run_tdls")
2019 iface.TDLSDiscover(addr1)
2020 gobject.timeout_add(100, self.run_tdls2)
2021 return False
2022
2023 def run_tdls2(self, *args):
2024 logger.debug("run_tdls2")
2025 iface.TDLSSetup(addr1)
2026 gobject.timeout_add(500, self.run_tdls3)
2027 return False
2028
2029 def run_tdls3(self, *args):
2030 logger.debug("run_tdls3")
2031 res = iface.TDLSStatus(addr1)
2032 if res == "connected":
2033 self.tdls_setup = True
2034 else:
2035 logger.info("Unexpected TDLSStatus: " + res)
2036 iface.TDLSTeardown(addr1)
2037 gobject.timeout_add(200, self.run_tdls4)
2038 return False
2039
2040 def run_tdls4(self, *args):
2041 logger.debug("run_tdls4")
2042 res = iface.TDLSStatus(addr1)
2043 if res == "peer does not exist":
2044 self.tdls_teardown = True
2045 else:
2046 logger.info("Unexpected TDLSStatus: " + res)
2047 self.loop.quit()
2048 return False
2049
2050 def success(self):
2051 return self.tdls_setup and self.tdls_teardown
2052
2053 with TestDbusTdls(bus) as t:
2054 if not t.success():
2055 raise Exception("Expected signals not seen")
2056
2057def test_dbus_pkcs11(dev, apdev):
2058 """D-Bus SetPKCS11EngineAndModulePath()"""
910eb5b5 2059 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2060 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2061
2062 try:
2063 iface.SetPKCS11EngineAndModulePath("foo", "bar")
2064 except dbus.exceptions.DBusException, e:
2065 if "Error.Failed: Reinit of the EAPOL" not in str(e):
2066 raise Exception("Unexpected error message for invalid SetPKCS11EngineAndModulePath: " + str(e))
2067
2068 try:
2069 iface.SetPKCS11EngineAndModulePath("foo", "")
2070 except dbus.exceptions.DBusException, e:
2071 if "Error.Failed: Reinit of the EAPOL" not in str(e):
2072 raise Exception("Unexpected error message for invalid SetPKCS11EngineAndModulePath: " + str(e))
2073
2074 iface.SetPKCS11EngineAndModulePath("", "bar")
2075 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11EnginePath",
2076 dbus_interface=dbus.PROPERTIES_IFACE)
2077 if res != "":
2078 raise Exception("Unexpected PKCS11EnginePath value: " + res)
2079 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11ModulePath",
2080 dbus_interface=dbus.PROPERTIES_IFACE)
2081 if res != "bar":
2082 raise Exception("Unexpected PKCS11ModulePath value: " + res)
2083
2084 iface.SetPKCS11EngineAndModulePath("", "")
2085 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11EnginePath",
2086 dbus_interface=dbus.PROPERTIES_IFACE)
2087 if res != "":
2088 raise Exception("Unexpected PKCS11EnginePath value: " + res)
2089 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11ModulePath",
2090 dbus_interface=dbus.PROPERTIES_IFACE)
2091 if res != "":
2092 raise Exception("Unexpected PKCS11ModulePath value: " + res)
2093
2094def test_dbus_apscan(dev, apdev):
2095 """D-Bus Get/Set ApScan"""
2096 try:
81e787b7 2097 _test_dbus_apscan(dev, apdev)
2ec82e67
JM
2098 finally:
2099 dev[0].request("AP_SCAN 1")
2100
2101def _test_dbus_apscan(dev, apdev):
910eb5b5 2102 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2103
2104 res = if_obj.Get(WPAS_DBUS_IFACE, "ApScan",
2105 dbus_interface=dbus.PROPERTIES_IFACE)
2106 if res != 1:
2107 raise Exception("Unexpected initial ApScan value: %d" % res)
2108
2109 for i in range(3):
2110 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(i),
2111 dbus_interface=dbus.PROPERTIES_IFACE)
2112 res = if_obj.Get(WPAS_DBUS_IFACE, "ApScan",
2113 dbus_interface=dbus.PROPERTIES_IFACE)
2114 if res != i:
2115 raise Exception("Unexpected ApScan value %d (expected %d)" % (res, i))
2116
2117 try:
2118 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.Int16(-1),
2119 dbus_interface=dbus.PROPERTIES_IFACE)
2120 raise Exception("Invalid Set(ApScan,-1) accepted")
2121 except dbus.exceptions.DBusException, e:
2122 if "Error.Failed: wrong property type" not in str(e):
2123 raise Exception("Unexpected error message for invalid Set(ApScan,-1): " + str(e))
2124
2125 try:
2126 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(123),
2127 dbus_interface=dbus.PROPERTIES_IFACE)
2128 raise Exception("Invalid Set(ApScan,123) accepted")
2129 except dbus.exceptions.DBusException, e:
2130 if "Error.Failed: ap_scan must be 0, 1, or 2" not in str(e):
2131 raise Exception("Unexpected error message for invalid Set(ApScan,123): " + str(e))
2132
2133 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(1),
2134 dbus_interface=dbus.PROPERTIES_IFACE)
2135
2136def test_dbus_fastreauth(dev, apdev):
2137 """D-Bus Get/Set FastReauth"""
910eb5b5 2138 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2139
2140 res = if_obj.Get(WPAS_DBUS_IFACE, "FastReauth",
2141 dbus_interface=dbus.PROPERTIES_IFACE)
2142 if res != True:
2143 raise Exception("Unexpected initial FastReauth value: " + str(res))
2144
2145 for i in [ False, True ]:
2146 if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Boolean(i),
2147 dbus_interface=dbus.PROPERTIES_IFACE)
2148 res = if_obj.Get(WPAS_DBUS_IFACE, "FastReauth",
2149 dbus_interface=dbus.PROPERTIES_IFACE)
2150 if res != i:
2151 raise Exception("Unexpected FastReauth value %d (expected %d)" % (res, i))
2152
2153 try:
2154 if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Int16(-1),
2155 dbus_interface=dbus.PROPERTIES_IFACE)
2156 raise Exception("Invalid Set(FastReauth,-1) accepted")
2157 except dbus.exceptions.DBusException, e:
2158 if "Error.Failed: wrong property type" not in str(e):
2159 raise Exception("Unexpected error message for invalid Set(ApScan,-1): " + str(e))
2160
2161 if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Boolean(True),
2162 dbus_interface=dbus.PROPERTIES_IFACE)
2163
2164def test_dbus_bss_expire(dev, apdev):
2165 """D-Bus Get/Set BSSExpireAge and BSSExpireCount"""
910eb5b5 2166 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2167
2168 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(179),
2169 dbus_interface=dbus.PROPERTIES_IFACE)
2170 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSExpireAge",
2171 dbus_interface=dbus.PROPERTIES_IFACE)
2172 if res != 179:
2173 raise Exception("Unexpected BSSExpireAge value %d (expected %d)" % (res, i))
2174
2175 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(3),
2176 dbus_interface=dbus.PROPERTIES_IFACE)
2177 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSExpireCount",
2178 dbus_interface=dbus.PROPERTIES_IFACE)
2179 if res != 3:
2180 raise Exception("Unexpected BSSExpireCount value %d (expected %d)" % (res, i))
2181
2182 try:
2183 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.Int16(-1),
2184 dbus_interface=dbus.PROPERTIES_IFACE)
2185 raise Exception("Invalid Set(BSSExpireAge,-1) accepted")
2186 except dbus.exceptions.DBusException, e:
2187 if "Error.Failed: wrong property type" not in str(e):
2188 raise Exception("Unexpected error message for invalid Set(BSSExpireAge,-1): " + str(e))
2189
2190 try:
2191 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(9),
2192 dbus_interface=dbus.PROPERTIES_IFACE)
2193 raise Exception("Invalid Set(BSSExpireAge,9) accepted")
2194 except dbus.exceptions.DBusException, e:
2195 if "Error.Failed: BSSExpireAge must be >= 10" not in str(e):
2196 raise Exception("Unexpected error message for invalid Set(BSSExpireAge,9): " + str(e))
2197
2198 try:
2199 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.Int16(-1),
2200 dbus_interface=dbus.PROPERTIES_IFACE)
2201 raise Exception("Invalid Set(BSSExpireCount,-1) accepted")
2202 except dbus.exceptions.DBusException, e:
2203 if "Error.Failed: wrong property type" not in str(e):
2204 raise Exception("Unexpected error message for invalid Set(BSSExpireCount,-1): " + str(e))
2205
2206 try:
2207 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(0),
2208 dbus_interface=dbus.PROPERTIES_IFACE)
2209 raise Exception("Invalid Set(BSSExpireCount,0) accepted")
2210 except dbus.exceptions.DBusException, e:
2211 if "Error.Failed: BSSExpireCount must be > 0" not in str(e):
2212 raise Exception("Unexpected error message for invalid Set(BSSExpireCount,0): " + str(e))
2213
2214 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(180),
2215 dbus_interface=dbus.PROPERTIES_IFACE)
2216 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(2),
2217 dbus_interface=dbus.PROPERTIES_IFACE)
2218
2219def test_dbus_country(dev, apdev):
2220 """D-Bus Get/Set Country"""
2221 try:
81e787b7 2222 _test_dbus_country(dev, apdev)
2ec82e67
JM
2223 finally:
2224 dev[0].request("SET country 00")
2225 subprocess.call(['iw', 'reg', 'set', '00'])
2226
2227def _test_dbus_country(dev, apdev):
910eb5b5 2228 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2229
2230 # work around issues with possible pending regdom event from the end of
2231 # the previous test case
2232 time.sleep(0.2)
2233 dev[0].dump_monitor()
2234
2235 if_obj.Set(WPAS_DBUS_IFACE, "Country", "FI",
2236 dbus_interface=dbus.PROPERTIES_IFACE)
2237 res = if_obj.Get(WPAS_DBUS_IFACE, "Country",
2238 dbus_interface=dbus.PROPERTIES_IFACE)
2239 if res != "FI":
2240 raise Exception("Unexpected Country value %s (expected FI)" % res)
2241
2242 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"])
2243 if ev is None:
cb346b49
JM
2244 # For now, work around separate P2P Device interface event delivery
2245 ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
2246 if ev is None:
2247 raise Exception("regdom change event not seen")
2ec82e67
JM
2248 if "init=USER type=COUNTRY alpha2=FI" not in ev:
2249 raise Exception("Unexpected event contents: " + ev)
2250
2251 try:
2252 if_obj.Set(WPAS_DBUS_IFACE, "Country", dbus.Int16(-1),
2253 dbus_interface=dbus.PROPERTIES_IFACE)
2254 raise Exception("Invalid Set(Country,-1) accepted")
2255 except dbus.exceptions.DBusException, e:
2256 if "Error.Failed: wrong property type" not in str(e):
2257 raise Exception("Unexpected error message for invalid Set(Country,-1): " + str(e))
2258
2259 try:
2260 if_obj.Set(WPAS_DBUS_IFACE, "Country", "F",
2261 dbus_interface=dbus.PROPERTIES_IFACE)
2262 raise Exception("Invalid Set(Country,F) accepted")
2263 except dbus.exceptions.DBusException, e:
2264 if "Error.Failed: invalid country code" not in str(e):
2265 raise Exception("Unexpected error message for invalid Set(Country,F): " + str(e))
2266
2267 if_obj.Set(WPAS_DBUS_IFACE, "Country", "00",
2268 dbus_interface=dbus.PROPERTIES_IFACE)
2269
2270 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"])
2271 if ev is None:
cb346b49
JM
2272 # For now, work around separate P2P Device interface event delivery
2273 ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
2274 if ev is None:
2275 raise Exception("regdom change event not seen")
2ec82e67
JM
2276 if "init=CORE type=WORLD" not in ev:
2277 raise Exception("Unexpected event contents: " + ev)
2278
2279def test_dbus_scan_interval(dev, apdev):
2280 """D-Bus Get/Set ScanInterval"""
2281 try:
2282 _test_dbus_scan_interval(dev, apdev)
2283 finally:
2284 dev[0].request("SCAN_INTERVAL 5")
2285
2286def _test_dbus_scan_interval(dev, apdev):
910eb5b5 2287 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2288
2289 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(3),
2290 dbus_interface=dbus.PROPERTIES_IFACE)
2291 res = if_obj.Get(WPAS_DBUS_IFACE, "ScanInterval",
2292 dbus_interface=dbus.PROPERTIES_IFACE)
2293 if res != 3:
2294 raise Exception("Unexpected ScanInterval value %d (expected %d)" % (res, i))
2295
2296 try:
2297 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.UInt16(100),
2298 dbus_interface=dbus.PROPERTIES_IFACE)
2299 raise Exception("Invalid Set(ScanInterval,100) accepted")
2300 except dbus.exceptions.DBusException, e:
2301 if "Error.Failed: wrong property type" not in str(e):
2302 raise Exception("Unexpected error message for invalid Set(ScanInterval,100): " + str(e))
2303
2304 try:
2305 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(-1),
2306 dbus_interface=dbus.PROPERTIES_IFACE)
2307 raise Exception("Invalid Set(ScanInterval,-1) accepted")
2308 except dbus.exceptions.DBusException, e:
2309 if "Error.Failed: scan_interval must be >= 0" not in str(e):
2310 raise Exception("Unexpected error message for invalid Set(ScanInterval,-1): " + str(e))
2311
2312 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(5),
2313 dbus_interface=dbus.PROPERTIES_IFACE)
2314
2315def test_dbus_probe_req_reporting(dev, apdev):
2316 """D-Bus Probe Request reporting"""
910eb5b5 2317 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67 2318
2ec82e67
JM
2319 dev[1].p2p_find(social=True)
2320
2321 class TestDbusProbe(TestDbus):
2322 def __init__(self, bus):
2323 TestDbus.__init__(self, bus)
2324 self.reported = False
2325
2326 def __enter__(self):
2327 gobject.timeout_add(1, self.run_test)
2328 gobject.timeout_add(15000, self.timeout)
cb346b49
JM
2329 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
2330 "GroupStarted")
2ec82e67
JM
2331 self.add_signal(self.probeRequest, WPAS_DBUS_IFACE, "ProbeRequest",
2332 byte_arrays=True)
2ec82e67
JM
2333 self.loop.run()
2334 return self
2335
cb346b49
JM
2336 def groupStarted(self, properties):
2337 logger.debug("groupStarted: " + str(properties))
2338 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
2339 properties['interface_object'])
2340 self.iface = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE)
2341 self.iface.SubscribeProbeReq()
2342 self.group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2343
2ec82e67
JM
2344 def probeRequest(self, args):
2345 logger.debug("probeRequest: args=%s" % str(args))
2346 self.reported = True
2347 self.loop.quit()
2348
2349 def run_test(self, *args):
2350 logger.debug("run_test")
cb346b49
JM
2351 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2352 params = dbus.Dictionary({ 'frequency': 2412 })
2353 p2p.GroupAdd(params)
2ec82e67
JM
2354 return False
2355
2356 def success(self):
2357 return self.reported
2358
2359 with TestDbusProbe(bus) as t:
2360 if not t.success():
2361 raise Exception("Expected signals not seen")
cb346b49
JM
2362 t.iface.UnsubscribeProbeReq()
2363 try:
2364 t.iface.UnsubscribeProbeReq()
2365 raise Exception("Invalid UnsubscribeProbeReq() accepted")
2366 except dbus.exceptions.DBusException, e:
2367 if "NoSubscription" not in str(e):
2368 raise Exception("Unexpected error message for invalid UnsubscribeProbeReq(): " + str(e))
2369 t.group_p2p.Disconnect()
2ec82e67
JM
2370
2371 with TestDbusProbe(bus) as t:
2372 if not t.success():
2373 raise Exception("Expected signals not seen")
2374 # On purpose, leave ProbeReq subscription in place to test automatic
2375 # cleanup.
2376
2377 dev[1].p2p_stop_find()
2ec82e67 2378
0e126c6d
JM
2379def test_dbus_probe_req_reporting_oom(dev, apdev):
2380 """D-Bus Probe Request reporting (OOM)"""
2381 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2382 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2383
fb9adae4
JM
2384 # Need to make sure this process has not already subscribed to avoid false
2385 # failures due to the operation succeeding due to os_strdup() not even
2386 # getting called.
2387 try:
2388 iface.UnsubscribeProbeReq()
2389 was_subscribed = True
2390 except dbus.exceptions.DBusException, e:
2391 was_subscribed = False
2392 pass
2393
0e126c6d
JM
2394 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_handler_subscribe_preq",
2395 "SubscribeProbeReq"):
2396 iface.SubscribeProbeReq()
2397
fb9adae4
JM
2398 if was_subscribed:
2399 # On purpose, leave ProbeReq subscription in place to test automatic
2400 # cleanup.
2401 iface.SubscribeProbeReq()
2402
2ec82e67
JM
2403def test_dbus_p2p_invalid(dev, apdev):
2404 """D-Bus invalid P2P operations"""
910eb5b5 2405 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2406 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2407
2408 try:
2409 p2p.RejectPeer(path + "/Peers/00112233445566")
2410 raise Exception("Invalid RejectPeer accepted")
2411 except dbus.exceptions.DBusException, e:
2412 if "UnknownError: Failed to call wpas_p2p_reject" not in str(e):
2413 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e))
2414
2415 try:
2416 p2p.RejectPeer("/foo")
2417 raise Exception("Invalid RejectPeer accepted")
2418 except dbus.exceptions.DBusException, e:
2419 if "InvalidArgs" not in str(e):
2420 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e))
2421
001c4bf5
JM
2422 tests = [ { },
2423 { 'peer': 'foo' },
2424 { 'foo': "bar" },
2425 { 'iface': "abc" },
2426 { 'iface': 123 } ]
2427 for t in tests:
2428 try:
2429 p2p.RemoveClient(t)
2430 raise Exception("Invalid RemoveClient accepted")
2431 except dbus.exceptions.DBusException, e:
2432 if "InvalidArgs" not in str(e):
2433 raise Exception("Unexpected error message for invalid RemoveClient(): " + str(e))
2434
2ec82e67
JM
2435 tests = [ {'DiscoveryType': 'foo'},
2436 {'RequestedDeviceTypes': 'foo'},
2437 {'RequestedDeviceTypes': ['foo']},
2438 {'RequestedDeviceTypes': ['1','2','3','4','5','6','7','8','9',
2439 '10','11','12','13','14','15','16',
2440 '17']},
2441 {'RequestedDeviceTypes': dbus.Array([], signature="s")},
2442 {'RequestedDeviceTypes': dbus.Array([['foo']], signature="as")},
2443 {'RequestedDeviceTypes': dbus.Array([], signature="i")},
2444 {'RequestedDeviceTypes': [dbus.ByteArray('12345678'),
2445 dbus.ByteArray('1234567')]},
2446 {'Foo': dbus.Int16(1)},
2447 {'Foo': dbus.UInt16(1)},
2448 {'Foo': dbus.Int64(1)},
2449 {'Foo': dbus.UInt64(1)},
2450 {'Foo': dbus.Double(1.23)},
2451 {'Foo': dbus.Signature('s')},
2452 {'Foo': 'bar'}]
2453 for t in tests:
2454 try:
2455 p2p.Find(dbus.Dictionary(t))
2456 raise Exception("Invalid Find accepted")
2457 except dbus.exceptions.DBusException, e:
2458 if "InvalidArgs" not in str(e):
2459 raise Exception("Unexpected error message for invalid Find(): " + str(e))
2460
2461 for p in [ "/foo",
2462 "/fi/w1/wpa_supplicant1/Interfaces/1234",
2463 "/fi/w1/wpa_supplicant1/Interfaces/1234/Networks/1234" ]:
2464 try:
2465 p2p.RemovePersistentGroup(dbus.ObjectPath(p))
2466 raise Exception("Invalid RemovePersistentGroup accepted")
2467 except dbus.exceptions.DBusException, e:
2468 if "InvalidArgs" not in str(e):
2469 raise Exception("Unexpected error message for invalid RemovePersistentGroup: " + str(e))
2470
2471 try:
2472 dev[0].request("P2P_SET disabled 1")
2473 p2p.Listen(5)
2474 raise Exception("Invalid Listen accepted")
2475 except dbus.exceptions.DBusException, e:
2476 if "UnknownError: Could not start P2P listen" not in str(e):
2477 raise Exception("Unexpected error message for invalid Listen: " + str(e))
2478 finally:
2479 dev[0].request("P2P_SET disabled 0")
2480
2481 test_obj = bus.get_object(WPAS_DBUS_SERVICE, path, introspect=False)
2482 test_p2p = dbus.Interface(test_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2483 try:
2484 test_p2p.Listen("foo")
2485 raise Exception("Invalid Listen accepted")
2486 except dbus.exceptions.DBusException, e:
2487 if "InvalidArgs" not in str(e):
2488 raise Exception("Unexpected error message for invalid Listen: " + str(e))
2489
2490 try:
2491 dev[0].request("P2P_SET disabled 1")
2492 p2p.ExtendedListen(dbus.Dictionary({}))
2493 raise Exception("Invalid ExtendedListen accepted")
2494 except dbus.exceptions.DBusException, e:
2495 if "UnknownError: failed to initiate a p2p_ext_listen" not in str(e):
2496 raise Exception("Unexpected error message for invalid ExtendedListen: " + str(e))
2497 finally:
2498 dev[0].request("P2P_SET disabled 0")
2499
2500 try:
2501 dev[0].request("P2P_SET disabled 1")
2502 args = { 'duration1': 30000, 'interval1': 102400,
2503 'duration2': 20000, 'interval2': 102400 }
2504 p2p.PresenceRequest(args)
2505 raise Exception("Invalid PresenceRequest accepted")
2506 except dbus.exceptions.DBusException, e:
2507 if "UnknownError: Failed to invoke presence request" not in str(e):
2508 raise Exception("Unexpected error message for invalid PresenceRequest: " + str(e))
2509 finally:
2510 dev[0].request("P2P_SET disabled 0")
2511
2512 try:
2513 params = dbus.Dictionary({'frequency': dbus.Int32(-1)})
2514 p2p.GroupAdd(params)
2515 raise Exception("Invalid GroupAdd accepted")
2516 except dbus.exceptions.DBusException, e:
2517 if "InvalidArgs" not in str(e):
2518 raise Exception("Unexpected error message for invalid GroupAdd: " + str(e))
2519
2520 try:
2521 params = dbus.Dictionary({'persistent_group_object':
2522 dbus.ObjectPath(path),
2523 'frequency': 2412})
2524 p2p.GroupAdd(params)
2525 raise Exception("Invalid GroupAdd accepted")
2526 except dbus.exceptions.DBusException, e:
2527 if "InvalidArgs" not in str(e):
2528 raise Exception("Unexpected error message for invalid GroupAdd: " + str(e))
2529
2530 try:
2531 p2p.Disconnect()
2532 raise Exception("Invalid Disconnect accepted")
2533 except dbus.exceptions.DBusException, e:
2534 if "UnknownError: failed to disconnect" not in str(e):
2535 raise Exception("Unexpected error message for invalid Disconnect: " + str(e))
2536
2537 try:
2538 dev[0].request("P2P_SET disabled 1")
2539 p2p.Flush()
2540 raise Exception("Invalid Flush accepted")
2541 except dbus.exceptions.DBusException, e:
2542 if "Error.Failed: P2P is not available for this interface" not in str(e):
2543 raise Exception("Unexpected error message for invalid Flush: " + str(e))
2544 finally:
2545 dev[0].request("P2P_SET disabled 0")
2546
2547 try:
2548 dev[0].request("P2P_SET disabled 1")
2549 args = { 'peer': path,
2550 'join': True,
2551 'wps_method': 'pbc',
2552 'frequency': 2412 }
2553 pin = p2p.Connect(args)
2554 raise Exception("Invalid Connect accepted")
2555 except dbus.exceptions.DBusException, e:
2556 if "Error.Failed: P2P is not available for this interface" not in str(e):
2557 raise Exception("Unexpected error message for invalid Connect: " + str(e))
2558 finally:
2559 dev[0].request("P2P_SET disabled 0")
2560
2561 tests = [ { 'frequency': dbus.Int32(-1) },
2562 { 'wps_method': 'pbc' },
2563 { 'wps_method': 'foo' } ]
2564 for args in tests:
2565 try:
2566 pin = p2p.Connect(args)
2567 raise Exception("Invalid Connect accepted")
2568 except dbus.exceptions.DBusException, e:
2569 if "InvalidArgs" not in str(e):
2570 raise Exception("Unexpected error message for invalid Connect: " + str(e))
2571
2572 try:
2573 dev[0].request("P2P_SET disabled 1")
2574 args = { 'peer': path }
2575 pin = p2p.Invite(args)
2576 raise Exception("Invalid Invite accepted")
2577 except dbus.exceptions.DBusException, e:
2578 if "Error.Failed: P2P is not available for this interface" not in str(e):
2579 raise Exception("Unexpected error message for invalid Invite: " + str(e))
2580 finally:
2581 dev[0].request("P2P_SET disabled 0")
2582
2583 try:
2584 args = { 'foo': 'bar' }
2585 pin = p2p.Invite(args)
2586 raise Exception("Invalid Invite accepted")
2587 except dbus.exceptions.DBusException, e:
2588 if "InvalidArgs" not in str(e):
2589 raise Exception("Unexpected error message for invalid Connect: " + str(e))
2590
2591 tests = [ (path, 'display', "InvalidArgs"),
2592 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2593 'display',
2594 "UnknownError: Failed to send provision discovery request"),
2595 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2596 'keypad',
2597 "UnknownError: Failed to send provision discovery request"),
2598 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2599 'pbc',
2600 "UnknownError: Failed to send provision discovery request"),
2601 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2602 'pushbutton',
2603 "UnknownError: Failed to send provision discovery request"),
2604 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2605 'foo', "InvalidArgs") ]
2606 for (p,method,err) in tests:
2607 try:
2608 p2p.ProvisionDiscoveryRequest(p, method)
2609 raise Exception("Invalid ProvisionDiscoveryRequest accepted")
2610 except dbus.exceptions.DBusException, e:
2611 if err not in str(e):
2612 raise Exception("Unexpected error message for invalid ProvisionDiscoveryRequest: " + str(e))
2613
2614 try:
2615 dev[0].request("P2P_SET disabled 1")
2616 if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Peers",
2617 dbus_interface=dbus.PROPERTIES_IFACE)
2618 raise Exception("Invalid Get(Peers) accepted")
2619 except dbus.exceptions.DBusException, e:
2620 if "Error.Failed: P2P is not available for this interface" not in str(e):
2621 raise Exception("Unexpected error message for invalid Get(Peers): " + str(e))
2622 finally:
2623 dev[0].request("P2P_SET disabled 0")
2624
0e126c6d
JM
2625def test_dbus_p2p_oom(dev, apdev):
2626 """D-Bus P2P operations and OOM"""
2627 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2628 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2629
2630 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_string_array",
2631 "Find", "InvalidArgs"):
2632 p2p.Find(dbus.Dictionary({ 'Foo': [ 'bar' ] }))
2633
2634 with alloc_fail_dbus(dev[0], 2, "_wpa_dbus_dict_entry_get_string_array",
2635 "Find", "InvalidArgs"):
2636 p2p.Find(dbus.Dictionary({ 'Foo': [ 'bar' ] }))
2637
2638 with alloc_fail_dbus(dev[0], 10, "_wpa_dbus_dict_entry_get_string_array",
2639 "Find", "InvalidArgs"):
2640 p2p.Find(dbus.Dictionary({ 'Foo': [ '1','2','3','4','5','6','7','8','9' ] }))
2641
2642 with alloc_fail_dbus(dev[0], 1, ":=_wpa_dbus_dict_entry_get_binarray",
2643 "Find", "InvalidArgs"):
2644 p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123') ] }))
2645
2646 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_byte_array;_wpa_dbus_dict_entry_get_binarray",
2647 "Find", "InvalidArgs"):
2648 p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123') ] }))
2649
2650 with alloc_fail_dbus(dev[0], 2, "=_wpa_dbus_dict_entry_get_binarray",
2651 "Find", "InvalidArgs"):
2652 p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123'),
2653 dbus.ByteArray('123'),
2654 dbus.ByteArray('123'),
2655 dbus.ByteArray('123'),
2656 dbus.ByteArray('123'),
2657 dbus.ByteArray('123'),
2658 dbus.ByteArray('123'),
2659 dbus.ByteArray('123'),
2660 dbus.ByteArray('123'),
2661 dbus.ByteArray('123'),
2662 dbus.ByteArray('123') ] }))
2663
2664 with alloc_fail_dbus(dev[0], 1, "wpabuf_alloc_ext_data;_wpa_dbus_dict_entry_get_binarray",
2665 "Find", "InvalidArgs"):
2666 p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123') ] }))
2667
2668 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_fill_value_from_variant;wpas_dbus_handler_p2p_find",
2669 "Find", "InvalidArgs"):
2670 p2p.Find(dbus.Dictionary({ 'Foo': path }))
2671
2672 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_byte_array",
2673 "AddService", "InvalidArgs"):
2674 args = { 'service_type': 'bonjour',
2675 'response': dbus.ByteArray(500*'b') }
2676 p2p.AddService(args)
2677
2678 with alloc_fail_dbus(dev[0], 2, "_wpa_dbus_dict_entry_get_byte_array",
2679 "AddService", "InvalidArgs"):
2680 p2p.AddService(args)
2681
2ec82e67
JM
2682def test_dbus_p2p_discovery(dev, apdev):
2683 """D-Bus P2P discovery"""
910eb5b5 2684 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2685 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2686
2687 addr0 = dev[0].p2p_dev_addr()
2688
2689 dev[1].request("SET sec_device_type 1-0050F204-2")
2690 dev[1].request("VENDOR_ELEM_ADD 1 dd0c0050f2041049000411223344")
2691 dev[1].p2p_listen()
2692 addr1 = dev[1].p2p_dev_addr()
2693 a1 = binascii.unhexlify(addr1.replace(':',''))
2694
2695 wfd_devinfo = "00001c440028"
2696 dev[2].request("SET wifi_display 1")
2697 dev[2].request("WFD_SUBELEM_SET 0 0006" + wfd_devinfo)
2698 wfd = binascii.unhexlify('000006' + wfd_devinfo)
2699 dev[2].p2p_listen()
2700 addr2 = dev[2].p2p_dev_addr()
2701 a2 = binascii.unhexlify(addr2.replace(':',''))
2702
2703 res = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE,
2704 dbus_interface=dbus.PROPERTIES_IFACE)
2705 if 'Peers' not in res:
2706 raise Exception("GetAll result missing Peers")
2707 if len(res['Peers']) != 0:
2708 raise Exception("Unexpected peer(s) in the list")
2709
2710 args = {'DiscoveryType': 'social',
2711 'RequestedDeviceTypes': [dbus.ByteArray('12345678')],
2712 'Timeout': dbus.Int32(1) }
2713 p2p.Find(dbus.Dictionary(args))
2714 p2p.StopFind()
2715
2716 class TestDbusP2p(TestDbus):
2717 def __init__(self, bus):
2718 TestDbus.__init__(self, bus)
2719 self.found = False
2720 self.found2 = False
2721 self.lost = False
571a1af2 2722 self.find_stopped = False
2ec82e67
JM
2723
2724 def __enter__(self):
2725 gobject.timeout_add(1, self.run_test)
2726 gobject.timeout_add(15000, self.timeout)
2727 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
2728 "DeviceFound")
2729 self.add_signal(self.deviceLost, WPAS_DBUS_IFACE_P2PDEVICE,
2730 "DeviceLost")
2731 self.add_signal(self.provisionDiscoveryResponseEnterPin,
2732 WPAS_DBUS_IFACE_P2PDEVICE,
2733 "ProvisionDiscoveryResponseEnterPin")
571a1af2
JM
2734 self.add_signal(self.findStopped, WPAS_DBUS_IFACE_P2PDEVICE,
2735 "FindStopped")
2ec82e67
JM
2736 self.loop.run()
2737 return self
2738
2739 def deviceFound(self, path):
2740 logger.debug("deviceFound: path=%s" % path)
2741 res = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Peers",
2742 dbus_interface=dbus.PROPERTIES_IFACE)
2743 if len(res) < 1:
2744 raise Exception("Unexpected number of peers")
2745 if path not in res:
2746 raise Exception("Mismatch in peer object path")
2747 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
2748 res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
2749 dbus_interface=dbus.PROPERTIES_IFACE,
2750 byte_arrays=True)
2751 logger.debug("peer properties: " + str(res))
2752
2753 if res['DeviceAddress'] == a1:
2754 if 'SecondaryDeviceTypes' not in res:
2755 raise Exception("Missing SecondaryDeviceTypes")
2756 sec = res['SecondaryDeviceTypes']
2757 if len(sec) < 1:
2758 raise Exception("Secondary device type missing")
2759 if "\x00\x01\x00\x50\xF2\x04\x00\x02" not in sec:
2760 raise Exception("Secondary device type mismatch")
2761
2762 if 'VendorExtension' not in res:
2763 raise Exception("Missing VendorExtension")
2764 vendor = res['VendorExtension']
2765 if len(vendor) < 1:
2766 raise Exception("Vendor extension missing")
2767 if "\x11\x22\x33\x44" not in vendor:
2768 raise Exception("Secondary device type mismatch")
2769
2770 self.found = True
2771 elif res['DeviceAddress'] == a2:
2772 if 'IEs' not in res:
2773 raise Exception("IEs missing")
2774 if res['IEs'] != wfd:
2775 raise Exception("IEs mismatch")
2776 self.found2 = True
2777 else:
2778 raise Exception("Unexpected peer device address")
2779
2780 if self.found and self.found2:
2781 p2p.StopFind()
2782 p2p.RejectPeer(path)
2783 p2p.ProvisionDiscoveryRequest(path, 'display')
2784
2785 def deviceLost(self, path):
2786 logger.debug("deviceLost: path=%s" % path)
2787 self.lost = True
2788 try:
2789 p2p.RejectPeer(path)
2790 raise Exception("Invalid RejectPeer accepted")
2791 except dbus.exceptions.DBusException, e:
2792 if "UnknownError: Failed to call wpas_p2p_reject" not in str(e):
2793 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e))
2794 self.loop.quit()
2795
2796 def provisionDiscoveryResponseEnterPin(self, peer_object):
2797 logger.debug("provisionDiscoveryResponseEnterPin - peer=%s" % peer_object)
2798 p2p.Flush()
2799
571a1af2
JM
2800 def findStopped(self):
2801 logger.debug("findStopped")
2802 self.find_stopped = True
2803
2ec82e67
JM
2804 def run_test(self, *args):
2805 logger.debug("run_test")
2806 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social',
2807 'Timeout': dbus.Int32(10)}))
2808 return False
2809
2810 def success(self):
571a1af2 2811 return self.found and self.lost and self.found2 and self.find_stopped
2ec82e67
JM
2812
2813 with TestDbusP2p(bus) as t:
2814 if not t.success():
2815 raise Exception("Expected signals not seen")
2816
2817 dev[1].request("VENDOR_ELEM_REMOVE 1 *")
2818 dev[1].p2p_stop_find()
2819
2820 p2p.Listen(1)
2821 dev[2].p2p_stop_find()
2822 dev[2].request("P2P_FLUSH")
2823 if not dev[2].discover_peer(addr0):
2824 raise Exception("Peer not found")
2825 p2p.StopFind()
2826 dev[2].p2p_stop_find()
2827
2828 try:
2829 p2p.ExtendedListen(dbus.Dictionary({'foo': 100}))
2830 raise Exception("Invalid ExtendedListen accepted")
2831 except dbus.exceptions.DBusException, e:
2832 if "InvalidArgs" not in str(e):
2833 raise Exception("Unexpected error message for invalid ExtendedListen(): " + str(e))
2834
2835 p2p.ExtendedListen(dbus.Dictionary({'period': 100, 'interval': 1000}))
2836 p2p.ExtendedListen(dbus.Dictionary({}))
2837 dev[0].global_request("P2P_EXT_LISTEN")
2838
2839def test_dbus_p2p_service_discovery(dev, apdev):
2840 """D-Bus P2P service discovery"""
910eb5b5 2841 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2842 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2843
2844 addr0 = dev[0].p2p_dev_addr()
2845 addr1 = dev[1].p2p_dev_addr()
2846
2847 bonjour_query = dbus.ByteArray(binascii.unhexlify('0b5f6166706f766572746370c00c000c01'))
2848 bonjour_response = dbus.ByteArray(binascii.unhexlify('074578616d706c65c027'))
2849
2850 args = { 'service_type': 'bonjour',
2851 'query': bonjour_query,
2852 'response': bonjour_response }
2853 p2p.AddService(args)
2854 p2p.FlushService()
2855 p2p.AddService(args)
2856
2857 try:
2858 p2p.DeleteService(args)
2859 raise Exception("Invalid DeleteService() accepted")
2860 except dbus.exceptions.DBusException, e:
2861 if "InvalidArgs" not in str(e):
2862 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
2863
2864 args = { 'service_type': 'bonjour',
2865 'query': bonjour_query }
2866 p2p.DeleteService(args)
2867 try:
2868 p2p.DeleteService(args)
2869 raise Exception("Invalid DeleteService() accepted")
2870 except dbus.exceptions.DBusException, e:
2871 if "InvalidArgs" not in str(e):
2872 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
2873
2874 args = { 'service_type': 'upnp',
2875 'version': 0x10,
2876 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' }
2877 p2p.AddService(args)
2878 p2p.DeleteService(args)
2879 try:
2880 p2p.DeleteService(args)
2881 raise Exception("Invalid DeleteService() accepted")
2882 except dbus.exceptions.DBusException, e:
2883 if "InvalidArgs" not in str(e):
2884 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
2885
2886 tests = [ { 'service_type': 'foo' },
2887 { 'service_type': 'foo', 'query': bonjour_query },
2888 { 'service_type': 'upnp' },
2889 { 'service_type': 'upnp', 'version': 0x10 },
2890 { 'service_type': 'upnp',
2891 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
2892 { 'version': 0x10,
2893 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
2894 { 'service_type': 'upnp', 'foo': 'bar' },
2895 { 'service_type': 'bonjour' },
2896 { 'service_type': 'bonjour', 'query': 'foo' },
2897 { 'service_type': 'bonjour', 'foo': 'bar' } ]
2898 for args in tests:
2899 try:
2900 p2p.DeleteService(args)
2901 raise Exception("Invalid DeleteService() accepted")
2902 except dbus.exceptions.DBusException, e:
2903 if "InvalidArgs" not in str(e):
2904 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
2905
2906 tests = [ { 'service_type': 'foo' },
2907 { 'service_type': 'upnp' },
2908 { 'service_type': 'upnp', 'version': 0x10 },
2909 { 'service_type': 'upnp',
2910 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
2911 { 'version': 0x10,
2912 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
2913 { 'service_type': 'upnp', 'foo': 'bar' },
2914 { 'service_type': 'bonjour' },
2915 { 'service_type': 'bonjour', 'query': 'foo' },
2916 { 'service_type': 'bonjour', 'response': 'foo' },
2917 { 'service_type': 'bonjour', 'query': bonjour_query },
2918 { 'service_type': 'bonjour', 'response': bonjour_response },
2919 { 'service_type': 'bonjour', 'query': dbus.ByteArray(500*'a') },
2920 { 'service_type': 'bonjour', 'foo': 'bar' } ]
2921 for args in tests:
2922 try:
2923 p2p.AddService(args)
2924 raise Exception("Invalid AddService() accepted")
2925 except dbus.exceptions.DBusException, e:
2926 if "InvalidArgs" not in str(e):
2927 raise Exception("Unexpected error message for invalid AddService(): " + str(e))
2928
2929 args = { 'tlv': dbus.ByteArray("\x02\x00\x00\x01") }
2930 ref = p2p.ServiceDiscoveryRequest(args)
2931 p2p.ServiceDiscoveryCancelRequest(ref)
2932 try:
2933 p2p.ServiceDiscoveryCancelRequest(ref)
2934 raise Exception("Invalid ServiceDiscoveryCancelRequest() accepted")
2935 except dbus.exceptions.DBusException, e:
2936 if "InvalidArgs" not in str(e):
2937 raise Exception("Unexpected error message for invalid AddService(): " + str(e))
2938 try:
2939 p2p.ServiceDiscoveryCancelRequest(dbus.UInt64(0))
2940 raise Exception("Invalid ServiceDiscoveryCancelRequest() accepted")
2941 except dbus.exceptions.DBusException, e:
2942 if "InvalidArgs" not in str(e):
2943 raise Exception("Unexpected error message for invalid AddService(): " + str(e))
2944
2945 args = { 'service_type': 'upnp',
2946 'version': 0x10,
2947 'service': 'ssdp:foo' }
2948 ref = p2p.ServiceDiscoveryRequest(args)
2949 p2p.ServiceDiscoveryCancelRequest(ref)
2950
2951 tests = [ { 'service_type': 'foo' },
2952 { 'foo': 'bar' },
2953 { 'tlv': 'foo' },
2954 { },
2955 { 'version': 0 },
2956 { 'service_type': 'upnp',
2957 'service': 'ssdp:foo' },
2958 { 'service_type': 'upnp',
2959 'version': 0x10 },
2960 { 'service_type': 'upnp',
2961 'version': 0x10,
2962 'service': 'ssdp:foo',
2963 'peer_object': dbus.ObjectPath(path + "/Peers") },
2964 { 'service_type': 'upnp',
2965 'version': 0x10,
2966 'service': 'ssdp:foo',
2967 'peer_object': path + "/Peers" },
2968 { 'service_type': 'upnp',
2969 'version': 0x10,
2970 'service': 'ssdp:foo',
2971 'peer_object': dbus.ObjectPath(path + "/Peers/00112233445566") } ]
2972 for args in tests:
2973 try:
2974 p2p.ServiceDiscoveryRequest(args)
2975 raise Exception("Invalid ServiceDiscoveryRequest accepted")
2976 except dbus.exceptions.DBusException, e:
2977 if "InvalidArgs" not in str(e):
2978 raise Exception("Unexpected error message for invalid ServiceDiscoveryRequest(): " + str(e))
2979
2980 args = { 'foo': 'bar' }
2981 try:
2982 p2p.ServiceDiscoveryResponse(dbus.Dictionary(args, signature='sv'))
2983 raise Exception("Invalid ServiceDiscoveryResponse accepted")
2984 except dbus.exceptions.DBusException, e:
2985 if "InvalidArgs" not in str(e):
2986 raise Exception("Unexpected error message for invalid ServiceDiscoveryResponse(): " + str(e))
2987
2988def test_dbus_p2p_service_discovery_query(dev, apdev):
2989 """D-Bus P2P service discovery query"""
910eb5b5 2990 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2991 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2992
2993 addr0 = dev[0].p2p_dev_addr()
2994 dev[1].request("P2P_SERVICE_ADD bonjour 0b5f6166706f766572746370c00c000c01 074578616d706c65c027")
2995 dev[1].p2p_listen()
2996 addr1 = dev[1].p2p_dev_addr()
2997
2998 class TestDbusP2p(TestDbus):
2999 def __init__(self, bus):
3000 TestDbus.__init__(self, bus)
3001 self.done = False
3002
3003 def __enter__(self):
3004 gobject.timeout_add(1, self.run_test)
3005 gobject.timeout_add(15000, self.timeout)
3006 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3007 "DeviceFound")
3008 self.add_signal(self.serviceDiscoveryResponse,
3009 WPAS_DBUS_IFACE_P2PDEVICE,
3010 "ServiceDiscoveryResponse", byte_arrays=True)
3011 self.loop.run()
3012 return self
3013
3014 def deviceFound(self, path):
3015 logger.debug("deviceFound: path=%s" % path)
3016 args = { 'peer_object': path,
3017 'tlv': dbus.ByteArray("\x02\x00\x00\x01") }
3018 p2p.ServiceDiscoveryRequest(args)
3019
3020 def serviceDiscoveryResponse(self, sd_request):
3021 logger.debug("serviceDiscoveryResponse: sd_request=%s" % str(sd_request))
3022 self.done = True
3023 self.loop.quit()
3024
3025 def run_test(self, *args):
3026 logger.debug("run_test")
3027 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social',
3028 'Timeout': dbus.Int32(10)}))
3029 return False
3030
3031 def success(self):
3032 return self.done
3033
3034 with TestDbusP2p(bus) as t:
3035 if not t.success():
3036 raise Exception("Expected signals not seen")
3037
3038 dev[1].p2p_stop_find()
3039
3040def test_dbus_p2p_service_discovery_external(dev, apdev):
3041 """D-Bus P2P service discovery with external response"""
3042 try:
3043 _test_dbus_p2p_service_discovery_external(dev, apdev)
3044 finally:
3045 dev[0].request("P2P_SERV_DISC_EXTERNAL 0")
3046
3047def _test_dbus_p2p_service_discovery_external(dev, apdev):
910eb5b5 3048 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
3049 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3050
3051 addr0 = dev[0].p2p_dev_addr()
3052 addr1 = dev[1].p2p_dev_addr()
3053 resp = "0300000101"
3054
3055 dev[1].request("P2P_FLUSH")
3056 dev[1].request("P2P_SERV_DISC_REQ " + addr0 + " 02000001")
3057 dev[1].p2p_find(social=True)
3058
3059 class TestDbusP2p(TestDbus):
3060 def __init__(self, bus):
3061 TestDbus.__init__(self, bus)
3062 self.sd = False
3063
3064 def __enter__(self):
3065 gobject.timeout_add(1, self.run_test)
3066 gobject.timeout_add(15000, self.timeout)
3067 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3068 "DeviceFound")
3069 self.add_signal(self.serviceDiscoveryRequest,
3070 WPAS_DBUS_IFACE_P2PDEVICE,
3071 "ServiceDiscoveryRequest")
3072 self.loop.run()
3073 return self
3074
3075 def deviceFound(self, path):
3076 logger.debug("deviceFound: path=%s" % path)
3077
3078 def serviceDiscoveryRequest(self, sd_request):
3079 logger.debug("serviceDiscoveryRequest: sd_request=%s" % str(sd_request))
3080 self.sd = True
3081 args = { 'peer_object': sd_request['peer_object'],
3082 'frequency': sd_request['frequency'],
3083 'dialog_token': sd_request['dialog_token'],
3084 'tlvs': dbus.ByteArray(binascii.unhexlify(resp)) }
3085 p2p.ServiceDiscoveryResponse(dbus.Dictionary(args, signature='sv'))
3086 self.loop.quit()
3087
3088 def run_test(self, *args):
3089 logger.debug("run_test")
3090 p2p.ServiceDiscoveryExternal(1)
3091 p2p.ServiceUpdate()
3092 p2p.Listen(15)
3093 return False
3094
3095 def success(self):
3096 return self.sd
3097
3098 with TestDbusP2p(bus) as t:
3099 if not t.success():
3100 raise Exception("Expected signals not seen")
3101
3102 ev = dev[1].wait_global_event(["P2P-SERV-DISC-RESP"], timeout=5)
3103 if ev is None:
3104 raise Exception("Service discovery timed out")
3105 if addr0 not in ev:
3106 raise Exception("Unexpected address in SD Response: " + ev)
3107 if ev.split(' ')[4] != resp:
3108 raise Exception("Unexpected response data SD Response: " + ev)
3109 dev[1].p2p_stop_find()
3110
3111 p2p.StopFind()
3112 p2p.ServiceDiscoveryExternal(0)
3113
3114def test_dbus_p2p_autogo(dev, apdev):
3115 """D-Bus P2P autonomous GO"""
910eb5b5 3116 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67 3117 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2ec82e67
JM
3118
3119 addr0 = dev[0].p2p_dev_addr()
3120
3121 class TestDbusP2p(TestDbus):
3122 def __init__(self, bus):
3123 TestDbus.__init__(self, bus)
3124 self.first = True
3125 self.waiting_end = False
035efb2c 3126 self.exceptions = False
001c4bf5 3127 self.deauthorized = False
2ec82e67
JM
3128 self.done = False
3129
3130 def __enter__(self):
3131 gobject.timeout_add(1, self.run_test)
3132 gobject.timeout_add(15000, self.timeout)
3133 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3134 "DeviceFound")
3135 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3136 "GroupStarted")
3137 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3138 "GroupFinished")
3139 self.add_signal(self.persistentGroupAdded,
3140 WPAS_DBUS_IFACE_P2PDEVICE,
3141 "PersistentGroupAdded")
3142 self.add_signal(self.persistentGroupRemoved,
3143 WPAS_DBUS_IFACE_P2PDEVICE,
3144 "PersistentGroupRemoved")
3145 self.add_signal(self.provisionDiscoveryRequestDisplayPin,
3146 WPAS_DBUS_IFACE_P2PDEVICE,
3147 "ProvisionDiscoveryRequestDisplayPin")
3148 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
3149 "StaAuthorized")
001c4bf5
JM
3150 self.add_signal(self.staDeauthorized, WPAS_DBUS_IFACE,
3151 "StaDeauthorized")
2ec82e67
JM
3152 self.loop.run()
3153 return self
3154
3155 def groupStarted(self, properties):
3156 logger.debug("groupStarted: " + str(properties))
3157 self.group = properties['group_object']
cb346b49
JM
3158 self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3159 properties['interface_object'])
3160 role = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Role",
3161 dbus_interface=dbus.PROPERTIES_IFACE)
2ec82e67 3162 if role != "GO":
035efb2c 3163 self.exceptions = True
2ec82e67 3164 raise Exception("Unexpected role reported: " + role)
cb346b49
JM
3165 group = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Group",
3166 dbus_interface=dbus.PROPERTIES_IFACE)
2ec82e67 3167 if group != properties['group_object']:
035efb2c 3168 self.exceptions = True
2ec82e67 3169 raise Exception("Unexpected Group reported: " + str(group))
cb346b49
JM
3170 go = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PeerGO",
3171 dbus_interface=dbus.PROPERTIES_IFACE)
2ec82e67 3172 if go != '/':
035efb2c 3173 self.exceptions = True
2ec82e67
JM
3174 raise Exception("Unexpected PeerGO value: " + str(go))
3175 if self.first:
3176 self.first = False
3177 logger.info("Remove persistent group instance")
cb346b49
JM
3178 group_p2p = dbus.Interface(self.g_if_obj,
3179 WPAS_DBUS_IFACE_P2PDEVICE)
3180 group_p2p.Disconnect()
2ec82e67
JM
3181 else:
3182 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3183 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 join")
3184
3185 def groupFinished(self, properties):
3186 logger.debug("groupFinished: " + str(properties))
3187 if self.waiting_end:
3188 logger.info("Remove persistent group")
3189 p2p.RemovePersistentGroup(self.persistent)
3190 else:
3191 logger.info("Re-start persistent group")
3192 params = dbus.Dictionary({'persistent_group_object':
3193 self.persistent,
3194 'frequency': 2412})
3195 p2p.GroupAdd(params)
3196
3197 def persistentGroupAdded(self, path, properties):
3198 logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
3199 self.persistent = path
3200
3201 def persistentGroupRemoved(self, path):
3202 logger.debug("persistentGroupRemoved: %s" % path)
3203 self.done = True
3204 self.loop.quit()
3205
3206 def deviceFound(self, path):
3207 logger.debug("deviceFound: path=%s" % path)
3208 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3209 self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3210 dbus_interface=dbus.PROPERTIES_IFACE,
3211 byte_arrays=True)
3212 logger.debug('peer properties: ' + str(self.peer))
3213
3214 def provisionDiscoveryRequestDisplayPin(self, peer_object, pin):
3215 logger.debug("provisionDiscoveryRequestDisplayPin - peer=%s pin=%s" % (peer_object, pin))
3216 self.peer_path = peer_object
3217 peer = binascii.unhexlify(peer_object.split('/')[-1])
3218 addr = ""
3219 for p in peer:
3220 if len(addr) > 0:
3221 addr += ':'
3222 addr += '%02x' % ord(p)
795b6f57
JM
3223
3224 params = { 'Role': 'registrar',
3225 'P2PDeviceAddress': self.peer['DeviceAddress'],
3226 'Bssid': self.peer['DeviceAddress'],
3227 'Type': 'pin' }
cb346b49 3228 wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS)
795b6f57
JM
3229 try:
3230 wps.Start(params)
035efb2c 3231 self.exceptions = True
795b6f57
JM
3232 raise Exception("Invalid WPS.Start() accepted")
3233 except dbus.exceptions.DBusException, e:
3234 if "InvalidArgs" not in str(e):
035efb2c 3235 self.exceptions = True
795b6f57 3236 raise Exception("Unexpected error message: " + str(e))
2ec82e67
JM
3237 params = { 'Role': 'registrar',
3238 'P2PDeviceAddress': self.peer['DeviceAddress'],
2ec82e67
JM
3239 'Type': 'pin',
3240 'Pin': '12345670' }
3241 logger.info("Authorize peer to connect to the group")
3242 wps.Start(params)
3243
3244 def staAuthorized(self, name):
3245 logger.debug("staAuthorized: " + name)
3246 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, self.peer_path)
3247 res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3248 dbus_interface=dbus.PROPERTIES_IFACE,
3249 byte_arrays=True)
035efb2c 3250 logger.debug("Peer properties: " + str(res))
2ec82e67 3251 if 'Groups' not in res or len(res['Groups']) != 1:
035efb2c 3252 self.exceptions = True
2ec82e67
JM
3253 raise Exception("Unexpected number of peer Groups entries")
3254 if res['Groups'][0] != self.group:
035efb2c 3255 self.exceptions = True
2ec82e67
JM
3256 raise Exception("Unexpected peer Groups[0] value")
3257
3258 g_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group)
3259 res = g_obj.GetAll(WPAS_DBUS_GROUP,
3260 dbus_interface=dbus.PROPERTIES_IFACE,
3261 byte_arrays=True)
3262 logger.debug("Group properties: " + str(res))
3263 if 'Members' not in res or len(res['Members']) != 1:
035efb2c 3264 self.exceptions = True
2ec82e67
JM
3265 raise Exception("Unexpected number of group members")
3266
3267 ext = dbus.ByteArray("\x11\x22\x33\x44")
3268 # Earlier implementation of this interface was a bit strange. The
3269 # property is defined to have aay signature and that is what the
3270 # getter returned. However, the setter expected there to be a
3271 # dictionary with 'WPSVendorExtensions' as the key surrounding these
3272 # values.. The current implementations maintains support for that
3273 # for backwards compability reasons. Verify that encoding first.
3274 vals = dbus.Dictionary({ 'WPSVendorExtensions': [ ext ]},
3275 signature='sv')
3276 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3277 dbus_interface=dbus.PROPERTIES_IFACE)
3278 res = g_obj.Get(WPAS_DBUS_GROUP, 'WPSVendorExtensions',
3279 dbus_interface=dbus.PROPERTIES_IFACE,
3280 byte_arrays=True)
3281 if len(res) != 1:
035efb2c 3282 self.exceptions = True
2ec82e67
JM
3283 raise Exception("Unexpected number of vendor extensions")
3284 if res[0] != ext:
035efb2c 3285 self.exceptions = True
2ec82e67
JM
3286 raise Exception("Vendor extension value changed")
3287
3288 # And now verify that the more appropriate encoding is accepted as
3289 # well.
3290 res.append(dbus.ByteArray('\xaa\xbb\xcc\xdd\xee\xff'))
3291 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
3292 dbus_interface=dbus.PROPERTIES_IFACE)
3293 res2 = g_obj.Get(WPAS_DBUS_GROUP, 'WPSVendorExtensions',
3294 dbus_interface=dbus.PROPERTIES_IFACE,
3295 byte_arrays=True)
3296 if len(res) != 2:
035efb2c 3297 self.exceptions = True
2ec82e67
JM
3298 raise Exception("Unexpected number of vendor extensions")
3299 if res[0] != res2[0] or res[1] != res2[1]:
035efb2c 3300 self.exceptions = True
2ec82e67
JM
3301 raise Exception("Vendor extension value changed")
3302
3303 for i in range(10):
3304 res.append(dbus.ByteArray('\xaa\xbb'))
3305 try:
3306 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
3307 dbus_interface=dbus.PROPERTIES_IFACE)
035efb2c 3308 self.exceptions = True
2ec82e67
JM
3309 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3310 except dbus.exceptions.DBusException, e:
3311 if "Error.Failed" not in str(e):
035efb2c 3312 self.exceptions = True
2ec82e67
JM
3313 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3314
3315 vals = dbus.Dictionary({ 'Foo': [ ext ]}, signature='sv')
3316 try:
3317 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3318 dbus_interface=dbus.PROPERTIES_IFACE)
035efb2c 3319 self.exceptions = True
2ec82e67
JM
3320 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3321 except dbus.exceptions.DBusException, e:
3322 if "InvalidArgs" not in str(e):
035efb2c 3323 self.exceptions = True
2ec82e67
JM
3324 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3325
3326 vals = [ "foo" ]
3327 try:
3328 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3329 dbus_interface=dbus.PROPERTIES_IFACE)
035efb2c 3330 self.exceptions = True
2ec82e67
JM
3331 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3332 except dbus.exceptions.DBusException, e:
3333 if "Error.Failed" not in str(e):
035efb2c 3334 self.exceptions = True
2ec82e67
JM
3335 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3336
3337 vals = [ [ "foo" ] ]
3338 try:
3339 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3340 dbus_interface=dbus.PROPERTIES_IFACE)
035efb2c 3341 self.exceptions = True
2ec82e67
JM
3342 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3343 except dbus.exceptions.DBusException, e:
3344 if "Error.Failed" not in str(e):
035efb2c 3345 self.exceptions = True
2ec82e67
JM
3346 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3347
001c4bf5
JM
3348 p2p.RemoveClient({ 'peer': self.peer_path })
3349
2ec82e67 3350 self.waiting_end = True
cb346b49
JM
3351 group_p2p = dbus.Interface(self.g_if_obj,
3352 WPAS_DBUS_IFACE_P2PDEVICE)
3353 group_p2p.Disconnect()
2ec82e67 3354
001c4bf5
JM
3355 def staDeauthorized(self, name):
3356 logger.debug("staDeauthorized: " + name)
3357 self.deauthorized = True
3358
2ec82e67
JM
3359 def run_test(self, *args):
3360 logger.debug("run_test")
3361 params = dbus.Dictionary({'persistent': True,
3362 'frequency': 2412})
3363 logger.info("Add a persistent group")
3364 p2p.GroupAdd(params)
3365 return False
3366
3367 def success(self):
035efb2c 3368 return self.done and self.deauthorized and not self.exceptions
2ec82e67
JM
3369
3370 with TestDbusP2p(bus) as t:
3371 if not t.success():
3372 raise Exception("Expected signals not seen")
3373
3374 dev[1].wait_go_ending_session()
3375
3376def test_dbus_p2p_autogo_pbc(dev, apdev):
3377 """D-Bus P2P autonomous GO and PBC"""
910eb5b5 3378 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67 3379 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2ec82e67
JM
3380
3381 addr0 = dev[0].p2p_dev_addr()
3382
3383 class TestDbusP2p(TestDbus):
3384 def __init__(self, bus):
3385 TestDbus.__init__(self, bus)
3386 self.first = True
3387 self.waiting_end = False
3388 self.done = False
3389
3390 def __enter__(self):
3391 gobject.timeout_add(1, self.run_test)
3392 gobject.timeout_add(15000, self.timeout)
3393 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3394 "DeviceFound")
3395 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3396 "GroupStarted")
3397 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3398 "GroupFinished")
3399 self.add_signal(self.provisionDiscoveryPBCRequest,
3400 WPAS_DBUS_IFACE_P2PDEVICE,
3401 "ProvisionDiscoveryPBCRequest")
3402 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
3403 "StaAuthorized")
3404 self.loop.run()
3405 return self
3406
3407 def groupStarted(self, properties):
3408 logger.debug("groupStarted: " + str(properties))
3409 self.group = properties['group_object']
cb346b49
JM
3410 self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3411 properties['interface_object'])
2ec82e67
JM
3412 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3413 dev1.global_request("P2P_CONNECT " + addr0 + " pbc join")
3414
3415 def groupFinished(self, properties):
3416 logger.debug("groupFinished: " + str(properties))
3417 self.done = True
3418 self.loop.quit()
3419
3420 def deviceFound(self, path):
3421 logger.debug("deviceFound: path=%s" % path)
3422 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3423 self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3424 dbus_interface=dbus.PROPERTIES_IFACE,
3425 byte_arrays=True)
3426 logger.debug('peer properties: ' + str(self.peer))
3427
3428 def provisionDiscoveryPBCRequest(self, peer_object):
3429 logger.debug("provisionDiscoveryPBCRequest - peer=%s" % peer_object)
3430 self.peer_path = peer_object
3431 peer = binascii.unhexlify(peer_object.split('/')[-1])
3432 addr = ""
3433 for p in peer:
3434 if len(addr) > 0:
3435 addr += ':'
3436 addr += '%02x' % ord(p)
3437 params = { 'Role': 'registrar',
3438 'P2PDeviceAddress': self.peer['DeviceAddress'],
2ec82e67
JM
3439 'Type': 'pbc' }
3440 logger.info("Authorize peer to connect to the group")
cb346b49 3441 wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS)
2ec82e67
JM
3442 wps.Start(params)
3443
3444 def staAuthorized(self, name):
3445 logger.debug("staAuthorized: " + name)
cb346b49
JM
3446 group_p2p = dbus.Interface(self.g_if_obj,
3447 WPAS_DBUS_IFACE_P2PDEVICE)
3448 group_p2p.Disconnect()
2ec82e67
JM
3449
3450 def run_test(self, *args):
3451 logger.debug("run_test")
3452 params = dbus.Dictionary({'frequency': 2412})
3453 p2p.GroupAdd(params)
3454 return False
3455
3456 def success(self):
3457 return self.done
3458
3459 with TestDbusP2p(bus) as t:
3460 if not t.success():
3461 raise Exception("Expected signals not seen")
3462
3463 dev[1].wait_go_ending_session()
3464 dev[1].flush_scan_cache()
3465
3466def test_dbus_p2p_autogo_legacy(dev, apdev):
3467 """D-Bus P2P autonomous GO and legacy STA"""
910eb5b5 3468 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67 3469 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2ec82e67
JM
3470
3471 addr0 = dev[0].p2p_dev_addr()
3472
3473 class TestDbusP2p(TestDbus):
3474 def __init__(self, bus):
3475 TestDbus.__init__(self, bus)
3476 self.done = False
3477
3478 def __enter__(self):
3479 gobject.timeout_add(1, self.run_test)
3480 gobject.timeout_add(15000, self.timeout)
3481 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3482 "GroupStarted")
3483 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3484 "GroupFinished")
3485 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
3486 "StaAuthorized")
3487 self.loop.run()
3488 return self
3489
3490 def groupStarted(self, properties):
3491 logger.debug("groupStarted: " + str(properties))
cb346b49
JM
3492 g_obj = bus.get_object(WPAS_DBUS_SERVICE,
3493 properties['group_object'])
3494 res = g_obj.GetAll(WPAS_DBUS_GROUP,
3495 dbus_interface=dbus.PROPERTIES_IFACE,
3496 byte_arrays=True)
3497 bssid = ':'.join([binascii.hexlify(l) for l in res['BSSID']])
3498
2ec82e67
JM
3499 pin = '12345670'
3500 params = { 'Role': 'enrollee',
3501 'Type': 'pin',
3502 'Pin': pin }
cb346b49
JM
3503 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3504 properties['interface_object'])
3505 wps = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_WPS)
2ec82e67
JM
3506 wps.Start(params)
3507 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
cb346b49
JM
3508 dev1.scan_for_bss(bssid, freq=2412)
3509 dev1.request("WPS_PIN " + bssid + " " + pin)
3510 self.group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2ec82e67
JM
3511
3512 def groupFinished(self, properties):
3513 logger.debug("groupFinished: " + str(properties))
3514 self.done = True
3515 self.loop.quit()
3516
3517 def staAuthorized(self, name):
3518 logger.debug("staAuthorized: " + name)
3519 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3520 dev1.request("DISCONNECT")
cb346b49 3521 self.group_p2p.Disconnect()
2ec82e67
JM
3522
3523 def run_test(self, *args):
3524 logger.debug("run_test")
3525 params = dbus.Dictionary({'frequency': 2412})
3526 p2p.GroupAdd(params)
3527 return False
3528
3529 def success(self):
3530 return self.done
3531
3532 with TestDbusP2p(bus) as t:
3533 if not t.success():
3534 raise Exception("Expected signals not seen")
3535
3536def test_dbus_p2p_join(dev, apdev):
3537 """D-Bus P2P join an autonomous GO"""
910eb5b5 3538 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
3539 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3540
3541 addr1 = dev[1].p2p_dev_addr()
3542 addr2 = dev[2].p2p_dev_addr()
3543 dev[1].p2p_start_go(freq=2412)
cb346b49 3544 dev1_group_ifname = dev[1].group_ifname
2ec82e67
JM
3545 dev[2].p2p_listen()
3546
3547 class TestDbusP2p(TestDbus):
3548 def __init__(self, bus):
3549 TestDbus.__init__(self, bus)
3550 self.done = False
3551 self.peer = None
3552 self.go = None
3553
3554 def __enter__(self):
3555 gobject.timeout_add(1, self.run_test)
3556 gobject.timeout_add(15000, self.timeout)
3557 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3558 "DeviceFound")
3559 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3560 "GroupStarted")
3561 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3562 "GroupFinished")
3563 self.add_signal(self.invitationResult, WPAS_DBUS_IFACE_P2PDEVICE,
3564 "InvitationResult")
3565 self.loop.run()
3566 return self
3567
3568 def deviceFound(self, path):
3569 logger.debug("deviceFound: path=%s" % path)
3570 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3571 res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3572 dbus_interface=dbus.PROPERTIES_IFACE,
3573 byte_arrays=True)
3574 logger.debug('peer properties: ' + str(res))
3575 if addr2.replace(':','') in path:
3576 self.peer = path
3577 elif addr1.replace(':','') in path:
3578 self.go = path
3579 if self.peer and self.go:
3580 logger.info("Join the group")
3581 p2p.StopFind()
3582 args = { 'peer': self.go,
3583 'join': True,
3584 'wps_method': 'pin',
3585 'frequency': 2412 }
3586 pin = p2p.Connect(args)
3587
3588 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
cb346b49
JM
3589 dev1.group_ifname = dev1_group_ifname
3590 dev1.group_request("WPS_PIN any " + pin)
2ec82e67
JM
3591
3592 def groupStarted(self, properties):
3593 logger.debug("groupStarted: " + str(properties))
cb346b49
JM
3594 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3595 properties['interface_object'])
3596 role = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Role",
3597 dbus_interface=dbus.PROPERTIES_IFACE)
2ec82e67
JM
3598 if role != "client":
3599 raise Exception("Unexpected role reported: " + role)
cb346b49
JM
3600 group = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Group",
3601 dbus_interface=dbus.PROPERTIES_IFACE)
2ec82e67
JM
3602 if group != properties['group_object']:
3603 raise Exception("Unexpected Group reported: " + str(group))
cb346b49
JM
3604 go = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PeerGO",
3605 dbus_interface=dbus.PROPERTIES_IFACE)
2ec82e67
JM
3606 if go != self.go:
3607 raise Exception("Unexpected PeerGO value: " + str(go))
3608
3609 g_obj = bus.get_object(WPAS_DBUS_SERVICE,
3610 properties['group_object'])
3611 res = g_obj.GetAll(WPAS_DBUS_GROUP,
3612 dbus_interface=dbus.PROPERTIES_IFACE,
3613 byte_arrays=True)
3614 logger.debug("Group properties: " + str(res))
3615
3616 ext = dbus.ByteArray("\x11\x22\x33\x44")
3617 try:
3618 # Set(WPSVendorExtensions) not allowed for P2P Client
3619 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
3620 dbus_interface=dbus.PROPERTIES_IFACE)
3621 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3622 except dbus.exceptions.DBusException, e:
3623 if "Error.Failed: Failed to set property" not in str(e):
3624 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3625
cb346b49 3626 group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2ec82e67
JM
3627 args = { 'duration1': 30000, 'interval1': 102400,
3628 'duration2': 20000, 'interval2': 102400 }
cb346b49 3629 group_p2p.PresenceRequest(args)
2ec82e67
JM
3630
3631 args = { 'peer': self.peer }
cb346b49 3632 group_p2p.Invite(args)
2ec82e67
JM
3633
3634 def groupFinished(self, properties):
3635 logger.debug("groupFinished: " + str(properties))
3636 self.done = True
3637 self.loop.quit()
3638
3639 def invitationResult(self, result):
3640 logger.debug("invitationResult: " + str(result))
3641 if result['status'] != 1:
3642 raise Exception("Unexpected invitation result: " + str(result))
3643 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
cb346b49 3644 dev1.group_ifname = dev1_group_ifname
2ec82e67
JM
3645 dev1.remove_group()
3646
3647 def run_test(self, *args):
3648 logger.debug("run_test")
3649 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
3650 return False
3651
3652 def success(self):
3653 return self.done
3654
3655 with TestDbusP2p(bus) as t:
3656 if not t.success():
3657 raise Exception("Expected signals not seen")
3658
3659 dev[2].p2p_stop_find()
3660
1992af11
JM
3661def test_dbus_p2p_invitation_received(dev, apdev):
3662 """D-Bus P2P and InvitationReceived"""
3663 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3664 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3665
3666 form(dev[0], dev[1])
3667 addr0 = dev[0].p2p_dev_addr()
3668 dev[0].p2p_listen()
3669 dev[0].global_request("SET persistent_reconnect 0")
3670
3671 if not dev[1].discover_peer(addr0, social=True):
3672 raise Exception("Peer " + addr0 + " not found")
3673 peer = dev[1].get_peer(addr0)
3674
3675 class TestDbusP2p(TestDbus):
3676 def __init__(self, bus):
3677 TestDbus.__init__(self, bus)
3678 self.done = False
3679
3680 def __enter__(self):
3681 gobject.timeout_add(1, self.run_test)
3682 gobject.timeout_add(15000, self.timeout)
3683 self.add_signal(self.invitationReceived, WPAS_DBUS_IFACE_P2PDEVICE,
3684 "InvitationReceived")
3685 self.loop.run()
3686 return self
3687
3688 def invitationReceived(self, result):
3689 logger.debug("invitationReceived: " + str(result))
3690 self.done = True
3691 self.loop.quit()
3692
3693 def run_test(self, *args):
3694 logger.debug("run_test")
3695 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3696 cmd = "P2P_INVITE persistent=" + peer['persistent'] + " peer=" + addr0
3697 dev1.global_request(cmd)
3698 return False
3699
3700 def success(self):
3701 return self.done
3702
3703 with TestDbusP2p(bus) as t:
3704 if not t.success():
3705 raise Exception("Expected signals not seen")
3706
3707 dev[0].p2p_stop_find()
3708 dev[1].p2p_stop_find()
3709
2ec82e67
JM
3710def test_dbus_p2p_config(dev, apdev):
3711 """D-Bus Get/Set P2PDeviceConfig"""
3712 try:
3713 _test_dbus_p2p_config(dev, apdev)
3714 finally:
3715 dev[0].request("P2P_SET ssid_postfix ")
3716
3717def _test_dbus_p2p_config(dev, apdev):
910eb5b5 3718 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
3719 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3720
3721 res = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3722 dbus_interface=dbus.PROPERTIES_IFACE,
3723 byte_arrays=True)
3724 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig", res,
3725 dbus_interface=dbus.PROPERTIES_IFACE)
3726 res2 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3727 dbus_interface=dbus.PROPERTIES_IFACE,
3728 byte_arrays=True)
3729
3730 if len(res) != len(res2):
3731 raise Exception("Different number of parameters")
3732 for k in res:
3733 if res[k] != res2[k]:
3734 raise Exception("Parameter %s value changes" % k)
3735
3736 changes = { 'SsidPostfix': 'foo',
3737 'VendorExtension': [ dbus.ByteArray('\x11\x22\x33\x44') ],
3738 'SecondaryDeviceTypes': [ dbus.ByteArray('\x11\x22\x33\x44\x55\x66\x77\x88') ]}
3739 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3740 dbus.Dictionary(changes, signature='sv'),
3741 dbus_interface=dbus.PROPERTIES_IFACE)
3742
3743 res2 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3744 dbus_interface=dbus.PROPERTIES_IFACE,
3745 byte_arrays=True)
3746 logger.debug("P2PDeviceConfig: " + str(res2))
3747 if 'VendorExtension' not in res2 or len(res2['VendorExtension']) != 1:
3748 raise Exception("VendorExtension does not match")
3749 if 'SecondaryDeviceTypes' not in res2 or len(res2['SecondaryDeviceTypes']) != 1:
3750 raise Exception("SecondaryDeviceType does not match")
3751
3752 changes = { 'SsidPostfix': '',
3753 'VendorExtension': dbus.Array([], signature="ay"),
3754 'SecondaryDeviceTypes': dbus.Array([], signature="ay") }
3755 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3756 dbus.Dictionary(changes, signature='sv'),
3757 dbus_interface=dbus.PROPERTIES_IFACE)
3758
3759 res3 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3760 dbus_interface=dbus.PROPERTIES_IFACE,
3761 byte_arrays=True)
3762 logger.debug("P2PDeviceConfig: " + str(res3))
3763 if 'VendorExtension' in res3:
3764 raise Exception("VendorExtension not removed")
3765 if 'SecondaryDeviceTypes' in res3:
3766 raise Exception("SecondaryDeviceType not removed")
3767
3768 try:
3769 dev[0].request("P2P_SET disabled 1")
3770 if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3771 dbus_interface=dbus.PROPERTIES_IFACE,
3772 byte_arrays=True)
3773 raise Exception("Invalid Get(P2PDeviceConfig) accepted")
3774 except dbus.exceptions.DBusException, e:
3775 if "Error.Failed: P2P is not available for this interface" not in str(e):
3776 raise Exception("Unexpected error message for invalid Invite: " + str(e))
3777 finally:
3778 dev[0].request("P2P_SET disabled 0")
3779
3780 try:
3781 dev[0].request("P2P_SET disabled 1")
3782 changes = { 'SsidPostfix': 'foo' }
3783 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3784 dbus.Dictionary(changes, signature='sv'),
3785 dbus_interface=dbus.PROPERTIES_IFACE)
3786 raise Exception("Invalid Set(P2PDeviceConfig) accepted")
3787 except dbus.exceptions.DBusException, e:
3788 if "Error.Failed: P2P is not available for this interface" not in str(e):
3789 raise Exception("Unexpected error message for invalid Invite: " + str(e))
3790 finally:
3791 dev[0].request("P2P_SET disabled 0")
3792
3793 tests = [ { 'DeviceName': 123 },
3794 { 'SsidPostfix': 123 },
3795 { 'Foo': 'Bar' } ]
3796 for changes in tests:
3797 try:
3798 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3799 dbus.Dictionary(changes, signature='sv'),
3800 dbus_interface=dbus.PROPERTIES_IFACE)
3801 raise Exception("Invalid Set(P2PDeviceConfig) accepted")
3802 except dbus.exceptions.DBusException, e:
3803 if "InvalidArgs" not in str(e):
3804 raise Exception("Unexpected error message for invalid Invite: " + str(e))
3805
3806def test_dbus_p2p_persistent(dev, apdev):
3807 """D-Bus P2P persistent group"""
910eb5b5 3808 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
3809 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3810
3811 class TestDbusP2p(TestDbus):
3812 def __init__(self, bus):
3813 TestDbus.__init__(self, bus)
3814
3815 def __enter__(self):
3816 gobject.timeout_add(1, self.run_test)
3817 gobject.timeout_add(15000, self.timeout)
3818 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3819 "GroupStarted")
3820 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3821 "GroupFinished")
3822 self.add_signal(self.persistentGroupAdded,
3823 WPAS_DBUS_IFACE_P2PDEVICE,
3824 "PersistentGroupAdded")
3825 self.loop.run()
3826 return self
3827
3828 def groupStarted(self, properties):
3829 logger.debug("groupStarted: " + str(properties))
cb346b49
JM
3830 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3831 properties['interface_object'])
3832 group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3833 group_p2p.Disconnect()
2ec82e67
JM
3834
3835 def groupFinished(self, properties):
3836 logger.debug("groupFinished: " + str(properties))
3837 self.loop.quit()
3838
3839 def persistentGroupAdded(self, path, properties):
3840 logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
3841 self.persistent = path
3842
3843 def run_test(self, *args):
3844 logger.debug("run_test")
3845 params = dbus.Dictionary({'persistent': True,
3846 'frequency': 2412})
3847 logger.info("Add a persistent group")
3848 p2p.GroupAdd(params)
3849 return False
3850
3851 def success(self):
3852 return True
3853
3854 with TestDbusP2p(bus) as t:
3855 if not t.success():
3856 raise Exception("Expected signals not seen")
3857 persistent = t.persistent
3858
3859 p_obj = bus.get_object(WPAS_DBUS_SERVICE, persistent)
3860 res = p_obj.Get(WPAS_DBUS_PERSISTENT_GROUP, "Properties",
3861 dbus_interface=dbus.PROPERTIES_IFACE, byte_arrays=True)
3862 logger.info("Persistent group Properties: " + str(res))
3863 vals = dbus.Dictionary({ 'ssid': 'DIRECT-foo' }, signature='sv')
3864 p_obj.Set(WPAS_DBUS_PERSISTENT_GROUP, "Properties", vals,
3865 dbus_interface=dbus.PROPERTIES_IFACE)
3866 res2 = p_obj.Get(WPAS_DBUS_PERSISTENT_GROUP, "Properties",
3867 dbus_interface=dbus.PROPERTIES_IFACE)
3868 if len(res) != len(res2):
3869 raise Exception("Different number of parameters")
3870 for k in res:
3871 if k != 'ssid' and res[k] != res2[k]:
3872 raise Exception("Parameter %s value changes" % k)
3873 if res2['ssid'] != '"DIRECT-foo"':
3874 raise Exception("Unexpected ssid")
3875
3876 args = dbus.Dictionary({ 'ssid': 'DIRECT-testing',
3877 'psk': '1234567890' }, signature='sv')
3878 group = p2p.AddPersistentGroup(args)
3879
3880 groups = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PersistentGroups",
3881 dbus_interface=dbus.PROPERTIES_IFACE)
3882 if len(groups) != 2:
3883 raise Exception("Unexpected number of persistent groups: " + str(groups))
3884
3885 p2p.RemoveAllPersistentGroups()
3886
3887 groups = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PersistentGroups",
3888 dbus_interface=dbus.PROPERTIES_IFACE)
3889 if len(groups) != 0:
3890 raise Exception("Unexpected number of persistent groups: " + str(groups))
3891
3892 try:
3893 p2p.RemovePersistentGroup(persistent)
3894 raise Exception("Invalid RemovePersistentGroup accepted")
3895 except dbus.exceptions.DBusException, e:
3896 if "NetworkUnknown: There is no such persistent group" not in str(e):
3897 raise Exception("Unexpected error message for invalid RemovePersistentGroup: " + str(e))
3898
3899def test_dbus_p2p_reinvoke_persistent(dev, apdev):
3900 """D-Bus P2P reinvoke persistent group"""
910eb5b5 3901 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67 3902 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2ec82e67
JM
3903
3904 addr0 = dev[0].p2p_dev_addr()
3905
3906 class TestDbusP2p(TestDbus):
3907 def __init__(self, bus):
3908 TestDbus.__init__(self, bus)
3909 self.first = True
3910 self.waiting_end = False
3911 self.done = False
3912 self.invited = False
3913
3914 def __enter__(self):
3915 gobject.timeout_add(1, self.run_test)
3916 gobject.timeout_add(15000, self.timeout)
3917 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3918 "DeviceFound")
3919 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3920 "GroupStarted")
3921 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3922 "GroupFinished")
3923 self.add_signal(self.persistentGroupAdded,
3924 WPAS_DBUS_IFACE_P2PDEVICE,
3925 "PersistentGroupAdded")
3926 self.add_signal(self.provisionDiscoveryRequestDisplayPin,
3927 WPAS_DBUS_IFACE_P2PDEVICE,
3928 "ProvisionDiscoveryRequestDisplayPin")
3929 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
3930 "StaAuthorized")
3931 self.loop.run()
3932 return self
3933
3934 def groupStarted(self, properties):
3935 logger.debug("groupStarted: " + str(properties))
cb346b49
JM
3936 self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3937 properties['interface_object'])
2ec82e67 3938 if not self.invited:
cb346b49
JM
3939 g_obj = bus.get_object(WPAS_DBUS_SERVICE,
3940 properties['group_object'])
3941 res = g_obj.GetAll(WPAS_DBUS_GROUP,
3942 dbus_interface=dbus.PROPERTIES_IFACE,
3943 byte_arrays=True)
3944 bssid = ':'.join([binascii.hexlify(l) for l in res['BSSID']])
2ec82e67 3945 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
cb346b49 3946 dev1.scan_for_bss(bssid, freq=2412)
2ec82e67
JM
3947 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 join")
3948
3949 def groupFinished(self, properties):
3950 logger.debug("groupFinished: " + str(properties))
3951 if self.invited:
3952 self.done = True
3953 self.loop.quit()
3954 else:
3955 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
cb346b49 3956 dev1.global_request("SET persistent_reconnect 1")
2ec82e67
JM
3957 dev1.p2p_listen()
3958
3959 args = { 'persistent_group_object': dbus.ObjectPath(path),
3960 'peer': self.peer_path }
3961 try:
3962 pin = p2p.Invite(args)
3963 raise Exception("Invalid Invite accepted")
3964 except dbus.exceptions.DBusException, e:
3965 if "InvalidArgs" not in str(e):
3966 raise Exception("Unexpected error message for invalid Invite: " + str(e))
3967
3968 args = { 'persistent_group_object': self.persistent,
3969 'peer': self.peer_path }
3970 pin = p2p.Invite(args)
3971 self.invited = True
3972
cb346b49
JM
3973 self.sta_group_ev = dev1.wait_global_event(["P2P-GROUP-STARTED"],
3974 timeout=15)
3975 if self.sta_group_ev is None:
3976 raise Exception("P2P-GROUP-STARTED event not seen")
3977
2ec82e67
JM
3978 def persistentGroupAdded(self, path, properties):
3979 logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
3980 self.persistent = path
3981
3982 def deviceFound(self, path):
3983 logger.debug("deviceFound: path=%s" % path)
3984 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3985 self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3986 dbus_interface=dbus.PROPERTIES_IFACE,
3987 byte_arrays=True)
3988
3989 def provisionDiscoveryRequestDisplayPin(self, peer_object, pin):
3990 logger.debug("provisionDiscoveryRequestDisplayPin - peer=%s pin=%s" % (peer_object, pin))
3991 self.peer_path = peer_object
3992 peer = binascii.unhexlify(peer_object.split('/')[-1])
3993 addr = ""
3994 for p in peer:
3995 if len(addr) > 0:
3996 addr += ':'
3997 addr += '%02x' % ord(p)
3998 params = { 'Role': 'registrar',
3999 'P2PDeviceAddress': self.peer['DeviceAddress'],
4000 'Bssid': self.peer['DeviceAddress'],
4001 'Type': 'pin',
4002 'Pin': '12345670' }
4003 logger.info("Authorize peer to connect to the group")
cb346b49
JM
4004 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4005 wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS)
2ec82e67 4006 wps.Start(params)
cb346b49
JM
4007 self.sta_group_ev = dev1.wait_global_event(["P2P-GROUP-STARTED"],
4008 timeout=15)
4009 if self.sta_group_ev is None:
4010 raise Exception("P2P-GROUP-STARTED event not seen")
2ec82e67
JM
4011
4012 def staAuthorized(self, name):
4013 logger.debug("staAuthorized: " + name)
4014 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
cb346b49 4015 dev1.group_form_result(self.sta_group_ev)
2ec82e67 4016 dev1.remove_group()
cb346b49 4017 ev = dev1.wait_global_event(["P2P-GROUP-REMOVED"], timeout=10)
2ec82e67
JM
4018 if ev is None:
4019 raise Exception("Group removal timed out")
cb346b49
JM
4020 group_p2p = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4021 group_p2p.Disconnect()
2ec82e67
JM
4022
4023 def run_test(self, *args):
4024 logger.debug("run_test")
4025 params = dbus.Dictionary({'persistent': True,
4026 'frequency': 2412})
4027 logger.info("Add a persistent group")
4028 p2p.GroupAdd(params)
4029 return False
4030
4031 def success(self):
4032 return self.done
4033
4034 with TestDbusP2p(bus) as t:
4035 if not t.success():
4036 raise Exception("Expected signals not seen")
4037
4038def test_dbus_p2p_go_neg_rx(dev, apdev):
4039 """D-Bus P2P GO Negotiation receive"""
910eb5b5 4040 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
4041 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4042 addr0 = dev[0].p2p_dev_addr()
4043
4044 class TestDbusP2p(TestDbus):
4045 def __init__(self, bus):
4046 TestDbus.__init__(self, bus)
4047 self.done = False
4048
4049 def __enter__(self):
4050 gobject.timeout_add(1, self.run_test)
4051 gobject.timeout_add(15000, self.timeout)
4052 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4053 "DeviceFound")
4054 self.add_signal(self.goNegotiationRequest,
4055 WPAS_DBUS_IFACE_P2PDEVICE,
4056 "GONegotiationRequest",
4057 byte_arrays=True)
4058 self.add_signal(self.goNegotiationSuccess,
4059 WPAS_DBUS_IFACE_P2PDEVICE,
4060 "GONegotiationSuccess",
4061 byte_arrays=True)
4062 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4063 "GroupStarted")
4064 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4065 "GroupFinished")
4066 self.loop.run()
4067 return self
4068
4069 def deviceFound(self, path):
4070 logger.debug("deviceFound: path=%s" % path)
4071
f5d5161d
JM
4072 def goNegotiationRequest(self, path, dev_passwd_id, go_intent=0):
4073 logger.debug("goNegotiationRequest: path=%s dev_passwd_id=%d go_intent=%d" % (path, dev_passwd_id, go_intent))
2ec82e67
JM
4074 if dev_passwd_id != 1:
4075 raise Exception("Unexpected dev_passwd_id=%d" % dev_passwd_id)
4076 args = { 'peer': path, 'wps_method': 'display', 'pin': '12345670',
4077 'go_intent': 15, 'persistent': False, 'frequency': 5175 }
4078 try:
4079 p2p.Connect(args)
4080 raise Exception("Invalid Connect accepted")
4081 except dbus.exceptions.DBusException, e:
4082 if "ConnectChannelUnsupported" not in str(e):
4083 raise Exception("Unexpected error message for invalid Connect: " + str(e))
4084
4085 args = { 'peer': path, 'wps_method': 'display', 'pin': '12345670',
4086 'go_intent': 15, 'persistent': False }
4087 p2p.Connect(args)
4088
4089 def goNegotiationSuccess(self, properties):
4090 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4091
4092 def groupStarted(self, properties):
4093 logger.debug("groupStarted: " + str(properties))
cb346b49
JM
4094 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4095 properties['interface_object'])
4096 group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4097 group_p2p.Disconnect()
2ec82e67
JM
4098
4099 def groupFinished(self, properties):
4100 logger.debug("groupFinished: " + str(properties))
4101 self.done = True
4102 self.loop.quit()
4103
4104 def run_test(self, *args):
4105 logger.debug("run_test")
4106 p2p.Listen(10)
4107 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4108 if not dev1.discover_peer(addr0):
4109 raise Exception("Peer not found")
4110 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 enter")
4111 return False
4112
4113 def success(self):
4114 return self.done
4115
4116 with TestDbusP2p(bus) as t:
4117 if not t.success():
4118 raise Exception("Expected signals not seen")
4119
4120def test_dbus_p2p_go_neg_auth(dev, apdev):
4121 """D-Bus P2P GO Negotiation authorized"""
910eb5b5 4122 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
4123 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4124 addr0 = dev[0].p2p_dev_addr()
4125 dev[1].p2p_listen()
4126
4127 class TestDbusP2p(TestDbus):
4128 def __init__(self, bus):
4129 TestDbus.__init__(self, bus)
4130 self.done = False
4131 self.peer_joined = False
4132 self.peer_disconnected = False
4133
4134 def __enter__(self):
4135 gobject.timeout_add(1, self.run_test)
4136 gobject.timeout_add(15000, self.timeout)
4137 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4138 "DeviceFound")
4139 self.add_signal(self.goNegotiationSuccess,
4140 WPAS_DBUS_IFACE_P2PDEVICE,
4141 "GONegotiationSuccess",
4142 byte_arrays=True)
4143 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4144 "GroupStarted")
4145 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4146 "GroupFinished")
4147 self.add_signal(self.staDeauthorized, WPAS_DBUS_IFACE,
4148 "StaDeauthorized")
4149 self.add_signal(self.peerJoined, WPAS_DBUS_GROUP,
4150 "PeerJoined")
4151 self.add_signal(self.peerDisconnected, WPAS_DBUS_GROUP,
4152 "PeerDisconnected")
4153 self.loop.run()
4154 return self
4155
4156 def deviceFound(self, path):
4157 logger.debug("deviceFound: path=%s" % path)
4158 args = { 'peer': path, 'wps_method': 'keypad',
4159 'go_intent': 15, 'authorize_only': True }
4160 try:
4161 p2p.Connect(args)
4162 raise Exception("Invalid Connect accepted")
4163 except dbus.exceptions.DBusException, e:
4164 if "InvalidArgs" not in str(e):
4165 raise Exception("Unexpected error message for invalid Connect: " + str(e))
4166
4167 args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4168 'go_intent': 15, 'authorize_only': True }
4169 p2p.Connect(args)
4170 p2p.Listen(10)
4171 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4172 if not dev1.discover_peer(addr0):
4173 raise Exception("Peer not found")
4174 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=0")
4175 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15);
4176 if ev is None:
4177 raise Exception("Group formation timed out")
cb346b49 4178 self.sta_group_ev = ev
2ec82e67
JM
4179
4180 def goNegotiationSuccess(self, properties):
4181 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4182
4183 def groupStarted(self, properties):
4184 logger.debug("groupStarted: " + str(properties))
cb346b49
JM
4185 self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4186 properties['interface_object'])
2ec82e67 4187 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
cb346b49 4188 dev1.group_form_result(self.sta_group_ev)
2ec82e67
JM
4189 dev1.remove_group()
4190
4191 def staDeauthorized(self, name):
4192 logger.debug("staDeuthorized: " + name)
cb346b49
JM
4193 group_p2p = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4194 group_p2p.Disconnect()
2ec82e67
JM
4195
4196 def peerJoined(self, peer):
4197 logger.debug("peerJoined: " + peer)
4198 self.peer_joined = True
4199
4200 def peerDisconnected(self, peer):
4201 logger.debug("peerDisconnected: " + peer)
4202 self.peer_disconnected = True
4203
4204 def groupFinished(self, properties):
4205 logger.debug("groupFinished: " + str(properties))
4206 self.done = True
4207 self.loop.quit()
4208
4209 def run_test(self, *args):
4210 logger.debug("run_test")
4211 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4212 return False
4213
4214 def success(self):
4215 return self.done and self.peer_joined and self.peer_disconnected
4216
4217 with TestDbusP2p(bus) as t:
4218 if not t.success():
4219 raise Exception("Expected signals not seen")
4220
4221def test_dbus_p2p_go_neg_init(dev, apdev):
4222 """D-Bus P2P GO Negotiation initiation"""
910eb5b5 4223 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
4224 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4225 addr0 = dev[0].p2p_dev_addr()
4226 dev[1].p2p_listen()
4227
4228 class TestDbusP2p(TestDbus):
4229 def __init__(self, bus):
4230 TestDbus.__init__(self, bus)
4231 self.done = False
20fd8def
JM
4232 self.peer_group_added = False
4233 self.peer_group_removed = False
2ec82e67
JM
4234
4235 def __enter__(self):
4236 gobject.timeout_add(1, self.run_test)
4237 gobject.timeout_add(15000, self.timeout)
4238 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4239 "DeviceFound")
4240 self.add_signal(self.goNegotiationSuccess,
4241 WPAS_DBUS_IFACE_P2PDEVICE,
4242 "GONegotiationSuccess",
4243 byte_arrays=True)
4244 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4245 "GroupStarted")
4246 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4247 "GroupFinished")
20fd8def
JM
4248 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4249 "PropertiesChanged")
2ec82e67
JM
4250 self.loop.run()
4251 return self
4252
4253 def deviceFound(self, path):
4254 logger.debug("deviceFound: path=%s" % path)
4255 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4256 args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4257 'go_intent': 0 }
4258 p2p.Connect(args)
4259
4260 ev = dev1.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15)
4261 if ev is None:
4262 raise Exception("Timeout while waiting for GO Neg Request")
4263 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15")
4264 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15);
4265 if ev is None:
4266 raise Exception("Group formation timed out")
cb346b49 4267 self.sta_group_ev = ev
2ec82e67
JM
4268
4269 def goNegotiationSuccess(self, properties):
4270 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4271
4272 def groupStarted(self, properties):
4273 logger.debug("groupStarted: " + str(properties))
cb346b49
JM
4274 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4275 properties['interface_object'])
4276 group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4277 group_p2p.Disconnect()
2ec82e67 4278 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
cb346b49 4279 dev1.group_form_result(self.sta_group_ev)
2ec82e67
JM
4280 dev1.remove_group()
4281
4282 def groupFinished(self, properties):
4283 logger.debug("groupFinished: " + str(properties))
4284 self.done = True
20fd8def
JM
4285
4286 def propertiesChanged(self, interface_name, changed_properties,
4287 invalidated_properties):
4288 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
4289 if interface_name != WPAS_DBUS_P2P_PEER:
4290 return
4291 if "Groups" not in changed_properties:
4292 return
4293 if len(changed_properties["Groups"]) > 0:
4294 self.peer_group_added = True
4295 if len(changed_properties["Groups"]) == 0:
4296 self.peer_group_removed = True
4297 self.loop.quit()
2ec82e67
JM
4298
4299 def run_test(self, *args):
4300 logger.debug("run_test")
4301 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4302 return False
4303
4304 def success(self):
20fd8def
JM
4305 return self.done and self.peer_group_added and self.peer_group_removed
4306
4307 with TestDbusP2p(bus) as t:
4308 if not t.success():
4309 raise Exception("Expected signals not seen")
4310
4311def test_dbus_p2p_group_termination_by_go(dev, apdev):
4312 """D-Bus P2P group removal on GO terminating the group"""
4313 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4314 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4315 addr0 = dev[0].p2p_dev_addr()
4316 dev[1].p2p_listen()
4317
4318 class TestDbusP2p(TestDbus):
4319 def __init__(self, bus):
4320 TestDbus.__init__(self, bus)
4321 self.done = False
4322 self.peer_group_added = False
4323 self.peer_group_removed = False
4324
4325 def __enter__(self):
4326 gobject.timeout_add(1, self.run_test)
4327 gobject.timeout_add(15000, self.timeout)
4328 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4329 "DeviceFound")
4330 self.add_signal(self.goNegotiationSuccess,
4331 WPAS_DBUS_IFACE_P2PDEVICE,
4332 "GONegotiationSuccess",
4333 byte_arrays=True)
4334 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4335 "GroupStarted")
4336 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4337 "GroupFinished")
4338 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4339 "PropertiesChanged")
4340 self.loop.run()
4341 return self
4342
4343 def deviceFound(self, path):
4344 logger.debug("deviceFound: path=%s" % path)
4345 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4346 args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4347 'go_intent': 0 }
4348 p2p.Connect(args)
4349
4350 ev = dev1.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15)
4351 if ev is None:
4352 raise Exception("Timeout while waiting for GO Neg Request")
4353 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15")
4354 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15);
4355 if ev is None:
4356 raise Exception("Group formation timed out")
4357 self.sta_group_ev = ev
4358
4359 def goNegotiationSuccess(self, properties):
4360 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4361
4362 def groupStarted(self, properties):
4363 logger.debug("groupStarted: " + str(properties))
4364 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4365 properties['interface_object'])
4366 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4367 dev1.group_form_result(self.sta_group_ev)
4368 dev1.remove_group()
4369
4370 def groupFinished(self, properties):
4371 logger.debug("groupFinished: " + str(properties))
4372 self.done = True
4373
4374 def propertiesChanged(self, interface_name, changed_properties,
4375 invalidated_properties):
4376 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
4377 if interface_name != WPAS_DBUS_P2P_PEER:
4378 return
4379 if "Groups" not in changed_properties:
4380 return
4381 if len(changed_properties["Groups"]) > 0:
4382 self.peer_group_added = True
6fad40df 4383 if len(changed_properties["Groups"]) == 0 and self.peer_group_added:
20fd8def
JM
4384 self.peer_group_removed = True
4385 self.loop.quit()
4386
4387 def run_test(self, *args):
4388 logger.debug("run_test")
4389 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4390 return False
4391
4392 def success(self):
4393 return self.done and self.peer_group_added and self.peer_group_removed
4394
4395 with TestDbusP2p(bus) as t:
4396 if not t.success():
4397 raise Exception("Expected signals not seen")
4398
4399def test_dbus_p2p_group_idle_timeout(dev, apdev):
4400 """D-Bus P2P group removal on idle timeout"""
4401 try:
4402 dev[0].global_request("SET p2p_group_idle 1")
4403 _test_dbus_p2p_group_idle_timeout(dev, apdev)
4404 finally:
4405 dev[0].global_request("SET p2p_group_idle 0")
4406
4407def _test_dbus_p2p_group_idle_timeout(dev, apdev):
4408 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4409 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4410 addr0 = dev[0].p2p_dev_addr()
4411 dev[1].p2p_listen()
4412
4413 class TestDbusP2p(TestDbus):
4414 def __init__(self, bus):
4415 TestDbus.__init__(self, bus)
4416 self.done = False
845d48c1 4417 self.group_started = False
20fd8def
JM
4418 self.peer_group_added = False
4419 self.peer_group_removed = False
4420
4421 def __enter__(self):
4422 gobject.timeout_add(1, self.run_test)
4423 gobject.timeout_add(15000, self.timeout)
4424 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4425 "DeviceFound")
4426 self.add_signal(self.goNegotiationSuccess,
4427 WPAS_DBUS_IFACE_P2PDEVICE,
4428 "GONegotiationSuccess",
4429 byte_arrays=True)
4430 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4431 "GroupStarted")
4432 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4433 "GroupFinished")
4434 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4435 "PropertiesChanged")
4436 self.loop.run()
4437 return self
4438
4439 def deviceFound(self, path):
4440 logger.debug("deviceFound: path=%s" % path)
4441 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4442 args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4443 'go_intent': 0 }
4444 p2p.Connect(args)
4445
4446 ev = dev1.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15)
4447 if ev is None:
4448 raise Exception("Timeout while waiting for GO Neg Request")
4449 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15")
4450 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15);
4451 if ev is None:
4452 raise Exception("Group formation timed out")
4453 self.sta_group_ev = ev
4454
4455 def goNegotiationSuccess(self, properties):
4456 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4457
4458 def groupStarted(self, properties):
4459 logger.debug("groupStarted: " + str(properties))
845d48c1 4460 self.group_started = True
20fd8def
JM
4461 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4462 properties['interface_object'])
4463 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4464 dev1.group_form_result(self.sta_group_ev)
4465 ifaddr = dev1.group_request("STA-FIRST").splitlines()[0]
4466 # Force disassociation with different reason code so that the
4467 # P2P Client using D-Bus does not get normal group termination event
4468 # from the GO.
4469 dev1.group_request("DEAUTHENTICATE " + ifaddr + " reason=0 test=0")
4470 dev1.remove_group()
4471
4472 def groupFinished(self, properties):
4473 logger.debug("groupFinished: " + str(properties))
4474 self.done = True
4475
4476 def propertiesChanged(self, interface_name, changed_properties,
4477 invalidated_properties):
4478 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
4479 if interface_name != WPAS_DBUS_P2P_PEER:
4480 return
845d48c1
JM
4481 if not self.group_started:
4482 return
20fd8def
JM
4483 if "Groups" not in changed_properties:
4484 return
4485 if len(changed_properties["Groups"]) > 0:
4486 self.peer_group_added = True
4487 if len(changed_properties["Groups"]) == 0:
4488 self.peer_group_removed = True
4489 self.loop.quit()
4490
4491 def run_test(self, *args):
4492 logger.debug("run_test")
4493 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4494 return False
4495
4496 def success(self):
4497 return self.done and self.peer_group_added and self.peer_group_removed
2ec82e67
JM
4498
4499 with TestDbusP2p(bus) as t:
4500 if not t.success():
4501 raise Exception("Expected signals not seen")
4502
4503def test_dbus_p2p_wps_failure(dev, apdev):
4504 """D-Bus P2P WPS failure"""
910eb5b5 4505 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
4506 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4507 addr0 = dev[0].p2p_dev_addr()
4508
4509 class TestDbusP2p(TestDbus):
4510 def __init__(self, bus):
4511 TestDbus.__init__(self, bus)
084780f1
JM
4512 self.wps_failed = False
4513 self.formation_failure = False
2ec82e67
JM
4514
4515 def __enter__(self):
4516 gobject.timeout_add(1, self.run_test)
4517 gobject.timeout_add(15000, self.timeout)
4518 self.add_signal(self.goNegotiationRequest,
4519 WPAS_DBUS_IFACE_P2PDEVICE,
4520 "GONegotiationRequest",
4521 byte_arrays=True)
4522 self.add_signal(self.goNegotiationSuccess,
4523 WPAS_DBUS_IFACE_P2PDEVICE,
4524 "GONegotiationSuccess",
4525 byte_arrays=True)
4526 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4527 "GroupStarted")
4528 self.add_signal(self.wpsFailed, WPAS_DBUS_IFACE_P2PDEVICE,
4529 "WpsFailed")
084780f1
JM
4530 self.add_signal(self.groupFormationFailure,
4531 WPAS_DBUS_IFACE_P2PDEVICE,
4532 "GroupFormationFailure")
2ec82e67
JM
4533 self.loop.run()
4534 return self
4535
f5d5161d
JM
4536 def goNegotiationRequest(self, path, dev_passwd_id, go_intent=0):
4537 logger.debug("goNegotiationRequest: path=%s dev_passwd_id=%d go_intent=%d" % (path, dev_passwd_id, go_intent))
2ec82e67
JM
4538 if dev_passwd_id != 1:
4539 raise Exception("Unexpected dev_passwd_id=%d" % dev_passwd_id)
4540 args = { 'peer': path, 'wps_method': 'display', 'pin': '12345670',
4541 'go_intent': 15 }
4542 p2p.Connect(args)
4543
4544 def goNegotiationSuccess(self, properties):
4545 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4546
4547 def groupStarted(self, properties):
4548 logger.debug("groupStarted: " + str(properties))
4549 raise Exception("Unexpected GroupStarted")
4550
4551 def wpsFailed(self, name, args):
4552 logger.debug("wpsFailed - name=%s args=%s" % (name, str(args)))
084780f1
JM
4553 self.wps_failed = True
4554 if self.formation_failure:
4555 self.loop.quit()
4556
4557 def groupFormationFailure(self, reason):
4558 logger.debug("groupFormationFailure - reason=%s" % reason)
4559 self.formation_failure = True
4560 if self.wps_failed:
4561 self.loop.quit()
2ec82e67
JM
4562
4563 def run_test(self, *args):
4564 logger.debug("run_test")
4565 p2p.Listen(10)
4566 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4567 if not dev1.discover_peer(addr0):
4568 raise Exception("Peer not found")
4569 dev1.global_request("P2P_CONNECT " + addr0 + " 87654321 enter")
4570 return False
4571
4572 def success(self):
084780f1 4573 return self.wps_failed and self.formation_failure
2ec82e67
JM
4574
4575 with TestDbusP2p(bus) as t:
4576 if not t.success():
4577 raise Exception("Expected signals not seen")
4578
4579def test_dbus_p2p_two_groups(dev, apdev):
4580 """D-Bus P2P with two concurrent groups"""
910eb5b5 4581 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
4582 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4583
4584 dev[0].request("SET p2p_no_group_iface 0")
4585 addr0 = dev[0].p2p_dev_addr()
4586 addr1 = dev[1].p2p_dev_addr()
4587 addr2 = dev[2].p2p_dev_addr()
4588 dev[1].p2p_start_go(freq=2412)
cb346b49 4589 dev1_group_ifname = dev[1].group_ifname
2ec82e67
JM
4590
4591 class TestDbusP2p(TestDbus):
4592 def __init__(self, bus):
4593 TestDbus.__init__(self, bus)
4594 self.done = False
4595 self.peer = None
4596 self.go = None
4597 self.group1 = None
4598 self.group2 = None
5a217649 4599 self.groups_removed = False
2ec82e67
JM
4600
4601 def __enter__(self):
4602 gobject.timeout_add(1, self.run_test)
4603 gobject.timeout_add(15000, self.timeout)
4604 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4605 "PropertiesChanged", byte_arrays=True)
4606 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4607 "DeviceFound")
4608 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4609 "GroupStarted")
4610 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4611 "GroupFinished")
4612 self.add_signal(self.peerJoined, WPAS_DBUS_GROUP,
4613 "PeerJoined")
4614 self.loop.run()
4615 return self
4616
4617 def propertiesChanged(self, interface_name, changed_properties,
4618 invalidated_properties):
4619 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
4620
4621 def deviceFound(self, path):
4622 logger.debug("deviceFound: path=%s" % path)
4623 if addr2.replace(':','') in path:
4624 self.peer = path
4625 elif addr1.replace(':','') in path:
4626 self.go = path
4627 if self.go and not self.group1:
4628 logger.info("Join the group")
4629 p2p.StopFind()
4630 pin = '12345670'
4631 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
cb346b49
JM
4632 dev1.group_ifname = dev1_group_ifname
4633 dev1.group_request("WPS_PIN any " + pin)
2ec82e67
JM
4634 args = { 'peer': self.go,
4635 'join': True,
4636 'wps_method': 'pin',
4637 'pin': pin,
4638 'frequency': 2412 }
4639 p2p.Connect(args)
4640
4641 def groupStarted(self, properties):
4642 logger.debug("groupStarted: " + str(properties))
4643 prop = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE,
4644 dbus_interface=dbus.PROPERTIES_IFACE)
4645 logger.debug("p2pdevice properties: " + str(prop))
4646
4647 g_obj = bus.get_object(WPAS_DBUS_SERVICE,
4648 properties['group_object'])
4649 res = g_obj.GetAll(WPAS_DBUS_GROUP,
4650 dbus_interface=dbus.PROPERTIES_IFACE,
4651 byte_arrays=True)
4652 logger.debug("Group properties: " + str(res))
4653
4654 if not self.group1:
4655 self.group1 = properties['group_object']
4656 self.group1iface = properties['interface_object']
4657 self.g1_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4658 self.group1iface)
4659
4660 logger.info("Start autonomous GO")
4661 params = dbus.Dictionary({ 'frequency': 2412 })
4662 p2p.GroupAdd(params)
4663 elif not self.group2:
4664 self.group2 = properties['group_object']
4665 self.group2iface = properties['interface_object']
4666 self.g2_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4667 self.group2iface)
4668 self.g2_bssid = res['BSSID']
4669
4670 if self.group1 and self.group2:
4671 logger.info("Authorize peer to join the group")
4672 a2 = binascii.unhexlify(addr2.replace(':',''))
4673 params = { 'Role': 'enrollee',
4674 'P2PDeviceAddress': dbus.ByteArray(a2),
4675 'Bssid': dbus.ByteArray(a2),
4676 'Type': 'pin',
4677 'Pin': '12345670' }
4678 g_wps = dbus.Interface(self.g2_if_obj, WPAS_DBUS_IFACE_WPS)
4679 g_wps.Start(params)
4680
4681 bssid = ':'.join([binascii.hexlify(l) for l in self.g2_bssid])
4682 dev2 = WpaSupplicant('wlan2', '/tmp/wpas-wlan2')
4683 dev2.scan_for_bss(bssid, freq=2412)
4684 dev2.global_request("P2P_CONNECT " + bssid + " 12345670 join freq=2412")
cb346b49
JM
4685 ev = dev2.wait_global_event(["P2P-GROUP-STARTED"], timeout=15);
4686 if ev is None:
4687 raise Exception("Group join timed out")
4688 self.dev2_group_ev = ev
2ec82e67
JM
4689
4690 def groupFinished(self, properties):
4691 logger.debug("groupFinished: " + str(properties))
4692
4693 if self.group1 == properties['group_object']:
4694 self.group1 = None
4695 elif self.group2 == properties['group_object']:
4696 self.group2 = None
4697
4698 if not self.group1 and not self.group2:
4699 self.done = True
4700 self.loop.quit()
4701
4702 def peerJoined(self, peer):
4703 logger.debug("peerJoined: " + peer)
5a217649
JM
4704 if self.groups_removed:
4705 return
2ec82e67
JM
4706 self.check_results()
4707
4708 dev2 = WpaSupplicant('wlan2', '/tmp/wpas-wlan2')
cb346b49 4709 dev2.group_form_result(self.dev2_group_ev)
2ec82e67
JM
4710 dev2.remove_group()
4711
4712 logger.info("Disconnect group2")
4713 group_p2p = dbus.Interface(self.g2_if_obj,
4714 WPAS_DBUS_IFACE_P2PDEVICE)
4715 group_p2p.Disconnect()
4716
4717 logger.info("Disconnect group1")
4718 group_p2p = dbus.Interface(self.g1_if_obj,
4719 WPAS_DBUS_IFACE_P2PDEVICE)
4720 group_p2p.Disconnect()
5a217649 4721 self.groups_removed = True
2ec82e67
JM
4722
4723 def check_results(self):
4724 logger.info("Check results with two concurrent groups in operation")
4725
4726 g1_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group1)
4727 res1 = g1_obj.GetAll(WPAS_DBUS_GROUP,
4728 dbus_interface=dbus.PROPERTIES_IFACE,
4729 byte_arrays=True)
4730
4731 g2_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group2)
4732 res2 = g2_obj.GetAll(WPAS_DBUS_GROUP,
4733 dbus_interface=dbus.PROPERTIES_IFACE,
4734 byte_arrays=True)
4735
4736 logger.info("group1 = " + self.group1)
4737 logger.debug("Group properties: " + str(res1))
4738
4739 logger.info("group2 = " + self.group2)
4740 logger.debug("Group properties: " + str(res2))
4741
4742 prop = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE,
4743 dbus_interface=dbus.PROPERTIES_IFACE)
4744 logger.debug("p2pdevice properties: " + str(prop))
4745
4746 if res1['Role'] != 'client':
4747 raise Exception("Group1 role reported incorrectly: " + res1['Role'])
4748 if res2['Role'] != 'GO':
4749 raise Exception("Group2 role reported incorrectly: " + res2['Role'])
4750 if prop['Role'] != 'device':
4751 raise Exception("p2pdevice role reported incorrectly: " + prop['Role'])
4752
4753 if len(res2['Members']) != 1:
4754 raise Exception("Unexpected Members value for group 2")
4755
4756 def run_test(self, *args):
4757 logger.debug("run_test")
4758 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4759 return False
4760
4761 def success(self):
4762 return self.done
4763
4764 with TestDbusP2p(bus) as t:
4765 if not t.success():
4766 raise Exception("Expected signals not seen")
4767
4768 dev[1].remove_group()
0e126c6d 4769
f572ae80
JM
4770def test_dbus_p2p_cancel(dev, apdev):
4771 """D-Bus P2P Cancel"""
4772 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4773 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4774 try:
4775 p2p.Cancel()
4776 raise Exception("Unexpected p2p.Cancel() success")
4777 except dbus.exceptions.DBusException, e:
4778 pass
4779
4780 addr0 = dev[0].p2p_dev_addr()
4781 dev[1].p2p_listen()
4782
4783 class TestDbusP2p(TestDbus):
4784 def __init__(self, bus):
4785 TestDbus.__init__(self, bus)
4786 self.done = False
4787
4788 def __enter__(self):
4789 gobject.timeout_add(1, self.run_test)
4790 gobject.timeout_add(15000, self.timeout)
4791 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4792 "DeviceFound")
4793 self.loop.run()
4794 return self
4795
4796 def deviceFound(self, path):
4797 logger.debug("deviceFound: path=%s" % path)
4798 args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4799 'go_intent': 0 }
4800 p2p.Connect(args)
4801 p2p.Cancel()
4802 self.done = True
4803 self.loop.quit()
4804
4805 def run_test(self, *args):
4806 logger.debug("run_test")
4807 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4808 return False
4809
4810 def success(self):
4811 return self.done
4812
4813 with TestDbusP2p(bus) as t:
4814 if not t.success():
4815 raise Exception("Expected signals not seen")
4816
0e126c6d
JM
4817def test_dbus_introspect(dev, apdev):
4818 """D-Bus introspection"""
4819 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4820
4821 res = if_obj.Introspect(WPAS_DBUS_IFACE,
4822 dbus_interface=dbus.INTROSPECTABLE_IFACE)
4823 logger.info("Initial Introspect: " + str(res))
4824 if res is None or "Introspectable" not in res or "GroupStarted" not in res:
4825 raise Exception("Unexpected initial Introspect response: " + str(res))
f0ef6889
DW
4826 if "FastReauth" not in res or "PassiveScan" not in res:
4827 raise Exception("Unexpected initial Introspect response: " + str(res))
0e126c6d
JM
4828
4829 with alloc_fail(dev[0], 1, "wpa_dbus_introspect"):
4830 res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
4831 dbus_interface=dbus.INTROSPECTABLE_IFACE)
4832 logger.info("Introspect: " + str(res2))
4833 if res2 is not None:
4834 raise Exception("Unexpected Introspect response")
4835
4836 with alloc_fail(dev[0], 1, "=add_interface;wpa_dbus_introspect"):
4837 res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
4838 dbus_interface=dbus.INTROSPECTABLE_IFACE)
4839 logger.info("Introspect: " + str(res2))
4840 if res2 is None:
4841 raise Exception("No Introspect response")
4842 if len(res2) >= len(res):
4843 raise Exception("Unexpected Introspect response")
4844
4845 with alloc_fail(dev[0], 1, "wpabuf_alloc;add_interface;wpa_dbus_introspect"):
4846 res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
4847 dbus_interface=dbus.INTROSPECTABLE_IFACE)
4848 logger.info("Introspect: " + str(res2))
4849 if res2 is None:
4850 raise Exception("No Introspect response")
4851 if len(res2) >= len(res):
4852 raise Exception("Unexpected Introspect response")
4853
4854 with alloc_fail(dev[0], 2, "=add_interface;wpa_dbus_introspect"):
4855 res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
4856 dbus_interface=dbus.INTROSPECTABLE_IFACE)
4857 logger.info("Introspect: " + str(res2))
4858 if res2 is None:
4859 raise Exception("No Introspect response")
4860 if len(res2) >= len(res):
4861 raise Exception("Unexpected Introspect response")
9bbce257
JM
4862
4863def test_dbus_ap(dev, apdev):
4864 """D-Bus AddNetwork for AP mode"""
4865 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4866 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
4867
4868 ssid = "test-wpa2-psk"
4869 passphrase = 'qwertyuiop'
4870
4871 class TestDbusConnect(TestDbus):
4872 def __init__(self, bus):
4873 TestDbus.__init__(self, bus)
4874 self.started = False
4875
4876 def __enter__(self):
4877 gobject.timeout_add(1, self.run_connect)
4878 gobject.timeout_add(15000, self.timeout)
4879 self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
4880 self.add_signal(self.networkSelected, WPAS_DBUS_IFACE,
4881 "NetworkSelected")
4882 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
4883 "PropertiesChanged")
4884 self.loop.run()
4885 return self
4886
4887 def networkAdded(self, network, properties):
4888 logger.debug("networkAdded: %s" % str(network))
4889 logger.debug(str(properties))
4890
4891 def networkSelected(self, network):
4892 logger.debug("networkSelected: %s" % str(network))
4893 self.network_selected = True
4894
4895 def propertiesChanged(self, properties):
4896 logger.debug("propertiesChanged: %s" % str(properties))
4897 if 'State' in properties and properties['State'] == "completed":
4898 self.started = True
4899 self.loop.quit()
4900
4901 def run_connect(self, *args):
4902 logger.debug("run_connect")
4903 args = dbus.Dictionary({ 'ssid': ssid,
4904 'key_mgmt': 'WPA-PSK',
4905 'psk': passphrase,
4906 'mode': 2,
4907 'frequency': 2412 },
4908 signature='sv')
4909 self.netw = iface.AddNetwork(args)
4910 iface.SelectNetwork(self.netw)
4911 return False
4912
4913 def success(self):
4914 return self.started
4915
4916 with TestDbusConnect(bus) as t:
4917 if not t.success():
4918 raise Exception("Expected signals not seen")
4919 dev[1].connect(ssid, psk=passphrase, scan_freq="2412")
d9e2a057
JM
4920
4921def test_dbus_connect_wpa_eap(dev, apdev):
4922 """D-Bus AddNetwork and connection with WPA+WPA2-Enterprise AP"""
4923 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4924 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
4925
4926 ssid = "test-wpa-eap"
4927 params = hostapd.wpa_eap_params(ssid=ssid)
4928 params["wpa"] = "3"
4929 params["rsn_pairwise"] = "CCMP"
4930 hapd = hostapd.add_ap(apdev[0]['ifname'], params)
4931
4932 class TestDbusConnect(TestDbus):
4933 def __init__(self, bus):
4934 TestDbus.__init__(self, bus)
4935 self.done = False
4936
4937 def __enter__(self):
4938 gobject.timeout_add(1, self.run_connect)
4939 gobject.timeout_add(15000, self.timeout)
4940 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
4941 "PropertiesChanged")
4942 self.add_signal(self.eap, WPAS_DBUS_IFACE, "EAP")
4943 self.loop.run()
4944 return self
4945
4946 def propertiesChanged(self, properties):
4947 logger.debug("propertiesChanged: %s" % str(properties))
4948 if 'State' in properties and properties['State'] == "completed":
4949 self.done = True
4950 self.loop.quit()
4951
4952 def eap(self, status, parameter):
4953 logger.debug("EAP: status=%s parameter=%s" % (status, parameter))
4954
4955 def run_connect(self, *args):
4956 logger.debug("run_connect")
4957 args = dbus.Dictionary({ 'ssid': ssid,
4958 'key_mgmt': 'WPA-EAP',
4959 'eap': 'PEAP',
4960 'identity': 'user',
4961 'password': 'password',
4962 'ca_cert': 'auth_serv/ca.pem',
4963 'phase2': 'auth=MSCHAPV2',
4964 'scan_freq': 2412 },
4965 signature='sv')
4966 self.netw = iface.AddNetwork(args)
4967 iface.SelectNetwork(self.netw)
4968 return False
4969
4970 def success(self):
4971 return self.done
4972
4973 with TestDbusConnect(bus) as t:
4974 if not t.success():
4975 raise Exception("Expected signals not seen")
708ec753
JM
4976
4977def test_dbus_ap_scan_2_ap_mode_scan(dev, apdev):
4978 """AP_SCAN 2 AP mode and D-Bus Scan()"""
4979 try:
4980 _test_dbus_ap_scan_2_ap_mode_scan(dev, apdev)
4981 finally:
4982 dev[0].request("AP_SCAN 1")
4983
4984def _test_dbus_ap_scan_2_ap_mode_scan(dev, apdev):
4985 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4986 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
4987
4988 if "OK" not in dev[0].request("AP_SCAN 2"):
4989 raise Exception("Failed to set AP_SCAN 2")
4990
4991 id = dev[0].add_network()
4992 dev[0].set_network(id, "mode", "2")
4993 dev[0].set_network_quoted(id, "ssid", "wpas-ap-open")
4994 dev[0].set_network(id, "key_mgmt", "NONE")
4995 dev[0].set_network(id, "frequency", "2412")
4996 dev[0].set_network(id, "scan_freq", "2412")
4997 dev[0].set_network(id, "disabled", "0")
4998 dev[0].select_network(id)
4999 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=5)
5000 if ev is None:
5001 raise Exception("AP failed to start")
5002
5003 with fail_test(dev[0], 1, "wpa_driver_nl80211_scan"):
5004 iface.Scan({'Type': 'active',
5005 'AllowRoam': True,
5006 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
5007 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-FAILED",
5008 "AP-DISABLED"], timeout=5)
5009 if ev is None:
5010 raise Exception("CTRL-EVENT-SCAN-FAILED not seen")
5011 if "AP-DISABLED" in ev:
5012 raise Exception("Unexpected AP-DISABLED event")
5013 if "retry=1" in ev:
5014 # Wait for the retry to scan happen
5015 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-FAILED",
5016 "AP-DISABLED"], timeout=5)
5017 if ev is None:
5018 raise Exception("CTRL-EVENT-SCAN-FAILED not seen - retry")
5019 if "AP-DISABLED" in ev:
5020 raise Exception("Unexpected AP-DISABLED event - retry")
5021
5022 dev[1].connect("wpas-ap-open", key_mgmt="NONE", scan_freq="2412")
5023 dev[1].request("DISCONNECT")
5024 dev[1].wait_disconnected()
5025 dev[0].request("DISCONNECT")
5026 dev[0].wait_disconnected()
d679ab74
JM
5027
5028def test_dbus_expectdisconnect(dev, apdev):
5029 """D-Bus ExpectDisconnect"""
5030 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5031 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
5032
5033 params = { "ssid": "test-open" }
5034 hapd = hostapd.add_ap(apdev[0]['ifname'], params)
5035 dev[0].connect("test-open", key_mgmt="NONE", scan_freq="2412")
5036
5037 # This does not really verify the behavior other than by going through the
5038 # code path for additional coverage.
5039 wpas.ExpectDisconnect()
5040 dev[0].request("DISCONNECT")
5041 dev[0].wait_disconnected()
2299b543
JM
5042
5043def test_dbus_save_config(dev, apdev):
5044 """D-Bus SaveConfig"""
5045 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5046 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5047 try:
5048 iface.SaveConfig()
5049 raise Exception("SaveConfig() accepted unexpectedly")
5050 except dbus.exceptions.DBusException, e:
5051 if not str(e).startswith("fi.w1.wpa_supplicant1.UnknownError: Not allowed to update configuration"):
5052 raise Exception("Unexpected error message for SaveConfig(): " + str(e))
ce96e65c
JM
5053
5054def test_dbus_vendor_elem(dev, apdev):
5055 """D-Bus vendor element operations"""
5056 try:
5057 _test_dbus_vendor_elem(dev, apdev)
5058 finally:
5059 dev[0].request("VENDOR_ELEM_REMOVE 1 *")
5060
5061def _test_dbus_vendor_elem(dev, apdev):
5062 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5063 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5064
5065 dev[0].request("VENDOR_ELEM_REMOVE 1 *")
5066
5067 try:
5068 ie = dbus.ByteArray("\x00\x00")
5069 iface.VendorElemAdd(-1, ie)
5070 raise Exception("Invalid VendorElemAdd() accepted")
5071 except dbus.exceptions.DBusException, e:
5072 if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e):
5073 raise Exception("Unexpected error message for invalid VendorElemAdd[1]: " + str(e))
5074
5075 try:
5076 ie = dbus.ByteArray("")
5077 iface.VendorElemAdd(1, ie)
5078 raise Exception("Invalid VendorElemAdd() accepted")
5079 except dbus.exceptions.DBusException, e:
5080 if "InvalidArgs" not in str(e) or "Invalid value" not in str(e):
5081 raise Exception("Unexpected error message for invalid VendorElemAdd[2]: " + str(e))
5082
5083 try:
5084 ie = dbus.ByteArray("\x00\x01")
5085 iface.VendorElemAdd(1, ie)
5086 raise Exception("Invalid VendorElemAdd() accepted")
5087 except dbus.exceptions.DBusException, e:
5088 if "InvalidArgs" not in str(e) or "Parse error" not in str(e):
5089 raise Exception("Unexpected error message for invalid VendorElemAdd[3]: " + str(e))
5090
5091 try:
5092 iface.VendorElemGet(-1)
5093 raise Exception("Invalid VendorElemGet() accepted")
5094 except dbus.exceptions.DBusException, e:
5095 if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e):
5096 raise Exception("Unexpected error message for invalid VendorElemGet[1]: " + str(e))
5097
5098 try:
5099 iface.VendorElemGet(1)
5100 raise Exception("Invalid VendorElemGet() accepted")
5101 except dbus.exceptions.DBusException, e:
5102 if "InvalidArgs" not in str(e) or "ID value does not exist" not in str(e):
5103 raise Exception("Unexpected error message for invalid VendorElemGet[2]: " + str(e))
5104
5105 try:
5106 ie = dbus.ByteArray("\x00\x00")
5107 iface.VendorElemRem(-1, ie)
5108 raise Exception("Invalid VendorElemRemove() accepted")
5109 except dbus.exceptions.DBusException, e:
5110 if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e):
5111 raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e))
5112
5113 try:
5114 ie = dbus.ByteArray("")
5115 iface.VendorElemRem(1, ie)
5116 raise Exception("Invalid VendorElemRemove() accepted")
5117 except dbus.exceptions.DBusException, e:
5118 if "InvalidArgs" not in str(e) or "Invalid value" not in str(e):
5119 raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e))
5120
5121 iface.VendorElemRem(1, "*")
5122
5123 ie = dbus.ByteArray("\x00\x01\x00")
5124 iface.VendorElemAdd(1, ie)
5125
5126 val = iface.VendorElemGet(1)
5127 if len(val) != len(ie):
5128 raise Exception("Unexpected VendorElemGet length")
5129 for i in range(len(val)):
5130 if val[i] != dbus.Byte(ie[i]):
5131 raise Exception("Unexpected VendorElemGet data")
5132
5133 ie2 = dbus.ByteArray("\xe0\x00")
5134 iface.VendorElemAdd(1, ie2)
5135
5136 ies = ie + ie2
5137 val = iface.VendorElemGet(1)
5138 if len(val) != len(ies):
5139 raise Exception("Unexpected VendorElemGet length[2]")
5140 for i in range(len(val)):
5141 if val[i] != dbus.Byte(ies[i]):
5142 raise Exception("Unexpected VendorElemGet data[2]")
5143
5144 try:
5145 test_ie = dbus.ByteArray("\x01\x01")
5146 iface.VendorElemRem(1, test_ie)
5147 raise Exception("Invalid VendorElemRemove() accepted")
5148 except dbus.exceptions.DBusException, e:
5149 if "InvalidArgs" not in str(e) or "Parse error" not in str(e):
5150 raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e))
5151
5152 iface.VendorElemRem(1, ie)
5153 val = iface.VendorElemGet(1)
5154 if len(val) != len(ie2):
5155 raise Exception("Unexpected VendorElemGet length[3]")
5156
5157 iface.VendorElemRem(1, "*")
5158 try:
5159 iface.VendorElemGet(1)
5160 raise Exception("Invalid VendorElemGet() accepted after removal")
5161 except dbus.exceptions.DBusException, e:
5162 if "InvalidArgs" not in str(e) or "ID value does not exist" not in str(e):
5163 raise Exception("Unexpected error message for invalid VendorElemGet after removal: " + str(e))