]> git.ipfire.org Git - thirdparty/hostap.git/blob - tests/hwsim/test_dbus.py
tests: D-Bus vendor element operations
[thirdparty/hostap.git] / tests / hwsim / test_dbus.py
1 # wpa_supplicant D-Bus interface tests
2 # Copyright (c) 2014-2015, Jouni Malinen <j@w1.fi>
3 #
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
6
7 import binascii
8 import logging
9 logger = logging.getLogger()
10 import subprocess
11 import time
12
13 try:
14 import gobject
15 import dbus
16 dbus_imported = True
17 except ImportError:
18 dbus_imported = False
19
20 import hostapd
21 from wpasupplicant import WpaSupplicant
22 from utils import HwsimSkip, alloc_fail, fail_test
23 from p2p_utils import *
24 from test_ap_tdls import connect_2sta_open
25 from test_ap_eap import check_altsubject_match_support
26
27 WPAS_DBUS_SERVICE = "fi.w1.wpa_supplicant1"
28 WPAS_DBUS_PATH = "/fi/w1/wpa_supplicant1"
29 WPAS_DBUS_IFACE = "fi.w1.wpa_supplicant1.Interface"
30 WPAS_DBUS_IFACE_WPS = WPAS_DBUS_IFACE + ".WPS"
31 WPAS_DBUS_NETWORK = "fi.w1.wpa_supplicant1.Network"
32 WPAS_DBUS_BSS = "fi.w1.wpa_supplicant1.BSS"
33 WPAS_DBUS_IFACE_P2PDEVICE = WPAS_DBUS_IFACE + ".P2PDevice"
34 WPAS_DBUS_P2P_PEER = "fi.w1.wpa_supplicant1.Peer"
35 WPAS_DBUS_GROUP = "fi.w1.wpa_supplicant1.Group"
36 WPAS_DBUS_PERSISTENT_GROUP = "fi.w1.wpa_supplicant1.PersistentGroup"
37
38 def prepare_dbus(dev):
39 if not dbus_imported:
40 logger.info("No dbus module available")
41 raise HwsimSkip("No dbus module available")
42 try:
43 from dbus.mainloop.glib import DBusGMainLoop
44 dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
45 bus = dbus.SystemBus()
46 wpas_obj = bus.get_object(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH)
47 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
48 path = wpas.GetInterface(dev.ifname)
49 if_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
50 return (bus,wpas_obj,path,if_obj)
51 except Exception, e:
52 raise HwsimSkip("Could not connect to D-Bus: %s" % e)
53
54 class TestDbus(object):
55 def __init__(self, bus):
56 self.loop = gobject.MainLoop()
57 self.signals = []
58 self.bus = bus
59
60 def __exit__(self, type, value, traceback):
61 for s in self.signals:
62 s.remove()
63
64 def add_signal(self, handler, interface, name, byte_arrays=False):
65 s = self.bus.add_signal_receiver(handler, dbus_interface=interface,
66 signal_name=name,
67 byte_arrays=byte_arrays)
68 self.signals.append(s)
69
70 def timeout(self, *args):
71 logger.debug("timeout")
72 self.loop.quit()
73 return False
74
75 class alloc_fail_dbus(object):
76 def __init__(self, dev, count, funcs, operation="Operation",
77 expected="NoMemory"):
78 self._dev = dev
79 self._count = count
80 self._funcs = funcs
81 self._operation = operation
82 self._expected = expected
83 def __enter__(self):
84 cmd = "TEST_ALLOC_FAIL %d:%s" % (self._count, self._funcs)
85 if "OK" not in self._dev.request(cmd):
86 raise HwsimSkip("TEST_ALLOC_FAIL not supported")
87 def __exit__(self, type, value, traceback):
88 if type is None:
89 raise Exception("%s succeeded during out-of-memory" % self._operation)
90 if type == dbus.exceptions.DBusException and self._expected in str(value):
91 return True
92 if self._dev.request("GET_ALLOC_FAIL") != "0:%s" % self._funcs:
93 raise Exception("%s did not trigger allocation failure" % self._operation)
94 return False
95
96 def start_ap(ap, ssid="test-wps",
97 ap_uuid="27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"):
98 params = { "ssid": ssid, "eap_server": "1", "wps_state": "2",
99 "wpa_passphrase": "12345678", "wpa": "2",
100 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
101 "ap_pin": "12345670", "uuid": ap_uuid}
102 return hostapd.add_ap(ap['ifname'], params)
103
104 def test_dbus_getall(dev, apdev):
105 """D-Bus GetAll"""
106 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
107
108 props = wpas_obj.GetAll(WPAS_DBUS_SERVICE,
109 dbus_interface=dbus.PROPERTIES_IFACE)
110 logger.debug("GetAll(fi.w1.wpa.supplicant1, /fi/w1/wpa_supplicant1) ==> " + str(props))
111
112 props = if_obj.GetAll(WPAS_DBUS_IFACE,
113 dbus_interface=dbus.PROPERTIES_IFACE)
114 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_IFACE, path, str(props)))
115
116 props = if_obj.GetAll(WPAS_DBUS_IFACE_WPS,
117 dbus_interface=dbus.PROPERTIES_IFACE)
118 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_IFACE_WPS, path, str(props)))
119
120 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
121 dbus_interface=dbus.PROPERTIES_IFACE)
122 if len(res) != 0:
123 raise Exception("Unexpected BSSs entry: " + str(res))
124
125 res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
126 dbus_interface=dbus.PROPERTIES_IFACE)
127 if len(res) != 0:
128 raise Exception("Unexpected Networks entry: " + str(res))
129
130 hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "open" })
131 bssid = apdev[0]['bssid']
132 dev[0].scan_for_bss(bssid, freq=2412)
133 id = dev[0].add_network()
134 dev[0].set_network(id, "disabled", "0")
135 dev[0].set_network_quoted(id, "ssid", "test")
136
137 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
138 dbus_interface=dbus.PROPERTIES_IFACE)
139 if len(res) != 1:
140 raise Exception("Missing BSSs entry: " + str(res))
141 bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
142 props = bss_obj.GetAll(WPAS_DBUS_BSS, dbus_interface=dbus.PROPERTIES_IFACE)
143 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_BSS, res[0], str(props)))
144 bssid_str = ''
145 for item in props['BSSID']:
146 if len(bssid_str) > 0:
147 bssid_str += ':'
148 bssid_str += '%02x' % item
149 if bssid_str != bssid:
150 raise Exception("Unexpected BSSID in BSSs entry")
151
152 res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
153 dbus_interface=dbus.PROPERTIES_IFACE)
154 if len(res) != 1:
155 raise Exception("Missing Networks entry: " + str(res))
156 net_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
157 props = net_obj.GetAll(WPAS_DBUS_NETWORK,
158 dbus_interface=dbus.PROPERTIES_IFACE)
159 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_NETWORK, res[0], str(props)))
160 ssid = props['Properties']['ssid']
161 if ssid != '"test"':
162 raise Exception("Unexpected SSID in network entry")
163
164 def dbus_get(dbus, wpas_obj, prop, expect=None, byte_arrays=False):
165 val = wpas_obj.Get(WPAS_DBUS_SERVICE, prop,
166 dbus_interface=dbus.PROPERTIES_IFACE,
167 byte_arrays=byte_arrays)
168 if expect is not None and val != expect:
169 raise Exception("Unexpected %s: %s (expected: %s)" %
170 (prop, str(val), str(expect)))
171 return val
172
173 def dbus_set(dbus, wpas_obj, prop, val):
174 wpas_obj.Set(WPAS_DBUS_SERVICE, prop, val,
175 dbus_interface=dbus.PROPERTIES_IFACE)
176
177 def test_dbus_properties(dev, apdev):
178 """D-Bus Get/Set fi.w1.wpa_supplicant1 properties"""
179 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
180
181 dbus_get(dbus, wpas_obj, "DebugLevel", expect="msgdump")
182 dbus_set(dbus, wpas_obj, "DebugLevel", "debug")
183 dbus_get(dbus, wpas_obj, "DebugLevel", expect="debug")
184 for (val,err) in [ (3, "Error.Failed: wrong property type"),
185 ("foo", "Error.Failed: wrong debug level value") ]:
186 try:
187 dbus_set(dbus, wpas_obj, "DebugLevel", val)
188 raise Exception("Invalid DebugLevel value accepted: " + str(val))
189 except dbus.exceptions.DBusException, e:
190 if err not in str(e):
191 raise Exception("Unexpected error message: " + str(e))
192 dbus_set(dbus, wpas_obj, "DebugLevel", "msgdump")
193 dbus_get(dbus, wpas_obj, "DebugLevel", expect="msgdump")
194
195 dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=True)
196 dbus_set(dbus, wpas_obj, "DebugTimestamp", False)
197 dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=False)
198 try:
199 dbus_set(dbus, wpas_obj, "DebugTimestamp", "foo")
200 raise Exception("Invalid DebugTimestamp value accepted")
201 except dbus.exceptions.DBusException, e:
202 if "Error.Failed: wrong property type" not in str(e):
203 raise Exception("Unexpected error message: " + str(e))
204 dbus_set(dbus, wpas_obj, "DebugTimestamp", True)
205 dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=True)
206
207 dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=True)
208 dbus_set(dbus, wpas_obj, "DebugShowKeys", False)
209 dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=False)
210 try:
211 dbus_set(dbus, wpas_obj, "DebugShowKeys", "foo")
212 raise Exception("Invalid DebugShowKeys value accepted")
213 except dbus.exceptions.DBusException, e:
214 if "Error.Failed: wrong property type" not in str(e):
215 raise Exception("Unexpected error message: " + str(e))
216 dbus_set(dbus, wpas_obj, "DebugShowKeys", True)
217 dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=True)
218
219 res = dbus_get(dbus, wpas_obj, "Interfaces")
220 if len(res) != 1:
221 raise Exception("Unexpected Interfaces value: " + str(res))
222
223 res = dbus_get(dbus, wpas_obj, "EapMethods")
224 if len(res) < 5 or "TTLS" not in res:
225 raise Exception("Unexpected EapMethods value: " + str(res))
226
227 res = dbus_get(dbus, wpas_obj, "Capabilities")
228 if len(res) < 2 or "p2p" not in res:
229 raise Exception("Unexpected Capabilities value: " + str(res))
230
231 dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True)
232 val = binascii.unhexlify("010006020304050608")
233 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(val))
234 res = dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True)
235 if val != res:
236 raise Exception("WFDIEs value changed")
237 try:
238 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray('\x00'))
239 raise Exception("Invalid WFDIEs value accepted")
240 except dbus.exceptions.DBusException, e:
241 if "InvalidArgs" not in str(e):
242 raise Exception("Unexpected error message: " + str(e))
243 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(''))
244 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(val))
245 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(''))
246 res = dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True)
247 if len(res) != 0:
248 raise Exception("WFDIEs not cleared properly")
249
250 res = dbus_get(dbus, wpas_obj, "EapMethods")
251 try:
252 dbus_set(dbus, wpas_obj, "EapMethods", res)
253 raise Exception("Invalid Set accepted")
254 except dbus.exceptions.DBusException, e:
255 if "InvalidArgs: Property is read-only" not in str(e):
256 raise Exception("Unexpected error message: " + str(e))
257
258 try:
259 wpas_obj.SetFoo(WPAS_DBUS_SERVICE, "DebugShowKeys", True,
260 dbus_interface=dbus.PROPERTIES_IFACE)
261 raise Exception("Unknown method accepted")
262 except dbus.exceptions.DBusException, e:
263 if "UnknownMethod" not in str(e):
264 raise Exception("Unexpected error message: " + str(e))
265
266 try:
267 wpas_obj.Get("foo", "DebugShowKeys",
268 dbus_interface=dbus.PROPERTIES_IFACE)
269 raise Exception("Invalid Get accepted")
270 except dbus.exceptions.DBusException, e:
271 if "InvalidArgs: No such property" not in str(e):
272 raise Exception("Unexpected error message: " + str(e))
273
274 test_obj = bus.get_object(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH,
275 introspect=False)
276 try:
277 test_obj.Get(123, "DebugShowKeys",
278 dbus_interface=dbus.PROPERTIES_IFACE)
279 raise Exception("Invalid Get accepted")
280 except dbus.exceptions.DBusException, e:
281 if "InvalidArgs: Invalid arguments" not in str(e):
282 raise Exception("Unexpected error message: " + str(e))
283 try:
284 test_obj.Get(WPAS_DBUS_SERVICE, 123,
285 dbus_interface=dbus.PROPERTIES_IFACE)
286 raise Exception("Invalid Get accepted")
287 except dbus.exceptions.DBusException, e:
288 if "InvalidArgs: Invalid arguments" not in str(e):
289 raise Exception("Unexpected error message: " + str(e))
290
291 try:
292 wpas_obj.Set(WPAS_DBUS_SERVICE, "WFDIEs",
293 dbus.ByteArray('', variant_level=2),
294 dbus_interface=dbus.PROPERTIES_IFACE)
295 raise Exception("Invalid Set accepted")
296 except dbus.exceptions.DBusException, e:
297 if "InvalidArgs: invalid message format" not in str(e):
298 raise Exception("Unexpected error message: " + str(e))
299
300 def test_dbus_set_global_properties(dev, apdev):
301 """D-Bus Get/Set fi.w1.wpa_supplicant1 interface global properties"""
302 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
303
304 props = [ ('Okc', '0', '1'), ('ModelName', '', 'blahblahblah') ]
305
306 for p in props:
307 res = if_obj.Get(WPAS_DBUS_IFACE, p[0],
308 dbus_interface=dbus.PROPERTIES_IFACE)
309 if res != p[1]:
310 raise Exception("Unexpected " + p[0] + " value: " + str(res))
311
312 if_obj.Set(WPAS_DBUS_IFACE, p[0], p[2],
313 dbus_interface=dbus.PROPERTIES_IFACE)
314
315 res = if_obj.Get(WPAS_DBUS_IFACE, p[0],
316 dbus_interface=dbus.PROPERTIES_IFACE)
317 if res != p[2]:
318 raise Exception("Unexpected " + p[0] + " value after set: " + str(res))
319
320 def test_dbus_invalid_method(dev, apdev):
321 """D-Bus invalid method"""
322 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
323 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
324
325 try:
326 wps.Foo()
327 raise Exception("Unknown method accepted")
328 except dbus.exceptions.DBusException, e:
329 if "UnknownMethod" not in str(e):
330 raise Exception("Unexpected error message: " + str(e))
331
332 test_obj = bus.get_object(WPAS_DBUS_SERVICE, path, introspect=False)
333 test_wps = dbus.Interface(test_obj, WPAS_DBUS_IFACE_WPS)
334 try:
335 test_wps.Start(123)
336 raise Exception("WPS.Start with incorrect signature accepted")
337 except dbus.exceptions.DBusException, e:
338 if "InvalidArgs: Invalid arg" not in str(e):
339 raise Exception("Unexpected error message: " + str(e))
340
341 def test_dbus_get_set_wps(dev, apdev):
342 """D-Bus Get/Set for WPS properties"""
343 try:
344 _test_dbus_get_set_wps(dev, apdev)
345 finally:
346 dev[0].request("SET wps_cred_processing 0")
347 dev[0].request("SET config_methods display keypad virtual_display nfc_interface p2ps")
348
349 def _test_dbus_get_set_wps(dev, apdev):
350 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
351
352 if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods",
353 dbus_interface=dbus.PROPERTIES_IFACE)
354
355 val = "display keypad virtual_display nfc_interface"
356 dev[0].request("SET config_methods " + val)
357
358 config = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods",
359 dbus_interface=dbus.PROPERTIES_IFACE)
360 if config != val:
361 raise Exception("Unexpected Get(ConfigMethods) result: " + config)
362
363 val2 = "push_button display"
364 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ConfigMethods", val2,
365 dbus_interface=dbus.PROPERTIES_IFACE)
366 config = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods",
367 dbus_interface=dbus.PROPERTIES_IFACE)
368 if config != val2:
369 raise Exception("Unexpected Get(ConfigMethods) result after Set: " + config)
370
371 dev[0].request("SET config_methods " + val)
372
373 for i in range(3):
374 dev[0].request("SET wps_cred_processing " + str(i))
375 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
376 dbus_interface=dbus.PROPERTIES_IFACE)
377 expected_val = False if i == 1 else True
378 if val != expected_val:
379 raise Exception("Unexpected Get(ProcessCredentials) result({}): {}".format(i, val))
380
381 class TestDbusGetSet(TestDbus):
382 def __init__(self, bus):
383 TestDbus.__init__(self, bus)
384 self.signal_received = False
385 self.signal_received_deprecated = False
386 self.sets_done = False
387
388 def __enter__(self):
389 gobject.timeout_add(1, self.run_sets)
390 gobject.timeout_add(1000, self.timeout)
391 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE_WPS,
392 "PropertiesChanged")
393 self.add_signal(self.propertiesChanged2, dbus.PROPERTIES_IFACE,
394 "PropertiesChanged")
395 self.loop.run()
396 return self
397
398 def propertiesChanged(self, properties):
399 logger.debug("PropertiesChanged: " + str(properties))
400 if properties.has_key("ProcessCredentials"):
401 self.signal_received_deprecated = True
402 if self.sets_done and self.signal_received:
403 self.loop.quit()
404
405 def propertiesChanged2(self, interface_name, changed_properties,
406 invalidated_properties):
407 logger.debug("propertiesChanged2: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
408 if interface_name != WPAS_DBUS_IFACE_WPS:
409 return
410 if changed_properties.has_key("ProcessCredentials"):
411 self.signal_received = True
412 if self.sets_done and self.signal_received_deprecated:
413 self.loop.quit()
414
415 def run_sets(self, *args):
416 logger.debug("run_sets")
417 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
418 dbus.Boolean(1),
419 dbus_interface=dbus.PROPERTIES_IFACE)
420 if if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
421 dbus_interface=dbus.PROPERTIES_IFACE) != True:
422 raise Exception("Unexpected Get(ProcessCredentials) result after Set");
423 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
424 dbus.Boolean(0),
425 dbus_interface=dbus.PROPERTIES_IFACE)
426 if if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
427 dbus_interface=dbus.PROPERTIES_IFACE) != False:
428 raise Exception("Unexpected Get(ProcessCredentials) result after Set");
429
430 self.dbus_sets_done = True
431 return False
432
433 def success(self):
434 return self.signal_received and self.signal_received_deprecated
435
436 with TestDbusGetSet(bus) as t:
437 if not t.success():
438 raise Exception("No signal received for ProcessCredentials change")
439
440 def test_dbus_wps_invalid(dev, apdev):
441 """D-Bus invaldi WPS operation"""
442 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
443 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
444
445 failures = [ {'Role': 'foo', 'Type': 'pbc'},
446 {'Role': 123, 'Type': 'pbc'},
447 {'Type': 'pbc'},
448 {'Role': 'enrollee'},
449 {'Role': 'registrar'},
450 {'Role': 'enrollee', 'Type': 123},
451 {'Role': 'enrollee', 'Type': 'foo'},
452 {'Role': 'enrollee', 'Type': 'pbc',
453 'Bssid': '02:33:44:55:66:77'},
454 {'Role': 'enrollee', 'Type': 'pin', 'Pin': 123},
455 {'Role': 'enrollee', 'Type': 'pbc',
456 'Bssid': dbus.ByteArray('12345')},
457 {'Role': 'enrollee', 'Type': 'pbc',
458 'P2PDeviceAddress': 12345},
459 {'Role': 'enrollee', 'Type': 'pbc',
460 'P2PDeviceAddress': dbus.ByteArray('12345')},
461 {'Role': 'enrollee', 'Type': 'pbc', 'Foo': 'bar'} ]
462 for args in failures:
463 try:
464 wps.Start(args)
465 raise Exception("Invalid WPS.Start() arguments accepted: " + str(args))
466 except dbus.exceptions.DBusException, e:
467 if not str(e).startswith("fi.w1.wpa_supplicant1.InvalidArgs"):
468 raise Exception("Unexpected error message: " + str(e))
469
470 def test_dbus_wps_oom(dev, apdev):
471 """D-Bus WPS operation (OOM)"""
472 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
473 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
474
475 with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_getter_state", "Get"):
476 if_obj.Get(WPAS_DBUS_IFACE, "State",
477 dbus_interface=dbus.PROPERTIES_IFACE)
478
479 hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "open" })
480 bssid = apdev[0]['bssid']
481 dev[0].scan_for_bss(bssid, freq=2412)
482
483 for i in range(1, 3):
484 with alloc_fail_dbus(dev[0], i, "=wpas_dbus_getter_bsss", "Get"):
485 if_obj.Get(WPAS_DBUS_IFACE, "BSSs",
486 dbus_interface=dbus.PROPERTIES_IFACE)
487
488 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
489 dbus_interface=dbus.PROPERTIES_IFACE)
490 bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
491 with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_getter_bss_rates", "Get"):
492 bss_obj.Get(WPAS_DBUS_BSS, "Rates",
493 dbus_interface=dbus.PROPERTIES_IFACE)
494
495 id = dev[0].add_network()
496 dev[0].set_network(id, "disabled", "0")
497 dev[0].set_network_quoted(id, "ssid", "test")
498
499 for i in range(1, 3):
500 with alloc_fail_dbus(dev[0], i, "=wpas_dbus_getter_networks", "Get"):
501 if_obj.Get(WPAS_DBUS_IFACE, "Networks",
502 dbus_interface=dbus.PROPERTIES_IFACE)
503
504 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_getter_interfaces", "Get"):
505 dbus_get(dbus, wpas_obj, "Interfaces")
506
507 for i in range(1, 6):
508 with alloc_fail_dbus(dev[0], i, "=eap_get_names_as_string_array;wpas_dbus_getter_eap_methods", "Get"):
509 dbus_get(dbus, wpas_obj, "EapMethods")
510
511 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_setter_config_methods", "Set",
512 expected="Error.Failed: Failed to set property"):
513 val2 = "push_button display"
514 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ConfigMethods", val2,
515 dbus_interface=dbus.PROPERTIES_IFACE)
516
517 with alloc_fail_dbus(dev[0], 1, "=wpa_config_add_network;wpas_dbus_handler_wps_start",
518 "WPS.Start",
519 expected="UnknownError: WPS start failed"):
520 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670'})
521
522 def test_dbus_wps_pbc(dev, apdev):
523 """D-Bus WPS/PBC operation and signals"""
524 try:
525 _test_dbus_wps_pbc(dev, apdev)
526 finally:
527 dev[0].request("SET wps_cred_processing 0")
528
529 def _test_dbus_wps_pbc(dev, apdev):
530 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
531 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
532
533 hapd = start_ap(apdev[0])
534 hapd.request("WPS_PBC")
535 bssid = apdev[0]['bssid']
536 dev[0].scan_for_bss(bssid, freq="2412")
537 dev[0].request("SET wps_cred_processing 2")
538
539 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
540 dbus_interface=dbus.PROPERTIES_IFACE)
541 if len(res) != 1:
542 raise Exception("Missing BSSs entry: " + str(res))
543 bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
544 props = bss_obj.GetAll(WPAS_DBUS_BSS, dbus_interface=dbus.PROPERTIES_IFACE)
545 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_BSS, res[0], str(props)))
546 if 'WPS' not in props:
547 raise Exception("No WPS information in the BSS entry")
548 if 'Type' not in props['WPS']:
549 raise Exception("No Type field in the WPS dictionary")
550 if props['WPS']['Type'] != 'pbc':
551 raise Exception("Unexpected WPS Type: " + props['WPS']['Type'])
552
553 class TestDbusWps(TestDbus):
554 def __init__(self, bus, wps):
555 TestDbus.__init__(self, bus)
556 self.success_seen = False
557 self.credentials_received = False
558 self.wps = wps
559
560 def __enter__(self):
561 gobject.timeout_add(1, self.start_pbc)
562 gobject.timeout_add(15000, self.timeout)
563 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
564 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
565 "Credentials")
566 self.loop.run()
567 return self
568
569 def wpsEvent(self, name, args):
570 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
571 if name == "success":
572 self.success_seen = True
573 if self.credentials_received:
574 self.loop.quit()
575
576 def credentials(self, args):
577 logger.debug("credentials: " + str(args))
578 self.credentials_received = True
579 if self.success_seen:
580 self.loop.quit()
581
582 def start_pbc(self, *args):
583 logger.debug("start_pbc")
584 self.wps.Start({'Role': 'enrollee', 'Type': 'pbc'})
585 return False
586
587 def success(self):
588 return self.success_seen and self.credentials_received
589
590 with TestDbusWps(bus, wps) as t:
591 if not t.success():
592 raise Exception("Failure in D-Bus operations")
593
594 dev[0].wait_connected(timeout=10)
595 dev[0].request("DISCONNECT")
596 hapd.disable()
597 dev[0].flush_scan_cache()
598
599 def test_dbus_wps_pbc_overlap(dev, apdev):
600 """D-Bus WPS/PBC operation and signal for PBC overlap"""
601 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
602 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
603
604 hapd = start_ap(apdev[0])
605 hapd2 = start_ap(apdev[1], ssid="test-wps2",
606 ap_uuid="27ea801a-9e5c-4e73-bd82-f89cbcd10d7f")
607 hapd.request("WPS_PBC")
608 hapd2.request("WPS_PBC")
609 bssid = apdev[0]['bssid']
610 dev[0].scan_for_bss(bssid, freq="2412")
611 bssid2 = apdev[1]['bssid']
612 dev[0].scan_for_bss(bssid2, freq="2412")
613
614 class TestDbusWps(TestDbus):
615 def __init__(self, bus, wps):
616 TestDbus.__init__(self, bus)
617 self.overlap_seen = False
618 self.wps = wps
619
620 def __enter__(self):
621 gobject.timeout_add(1, self.start_pbc)
622 gobject.timeout_add(15000, self.timeout)
623 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
624 self.loop.run()
625 return self
626
627 def wpsEvent(self, name, args):
628 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
629 if name == "pbc-overlap":
630 self.overlap_seen = True
631 self.loop.quit()
632
633 def start_pbc(self, *args):
634 logger.debug("start_pbc")
635 self.wps.Start({'Role': 'enrollee', 'Type': 'pbc'})
636 return False
637
638 def success(self):
639 return self.overlap_seen
640
641 with TestDbusWps(bus, wps) as t:
642 if not t.success():
643 raise Exception("Failure in D-Bus operations")
644
645 dev[0].request("WPS_CANCEL")
646 dev[0].request("DISCONNECT")
647 hapd.disable()
648 dev[0].flush_scan_cache()
649
650 def test_dbus_wps_pin(dev, apdev):
651 """D-Bus WPS/PIN operation and signals"""
652 try:
653 _test_dbus_wps_pin(dev, apdev)
654 finally:
655 dev[0].request("SET wps_cred_processing 0")
656
657 def _test_dbus_wps_pin(dev, apdev):
658 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
659 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
660
661 hapd = start_ap(apdev[0])
662 hapd.request("WPS_PIN any 12345670")
663 bssid = apdev[0]['bssid']
664 dev[0].scan_for_bss(bssid, freq="2412")
665 dev[0].request("SET wps_cred_processing 2")
666
667 class TestDbusWps(TestDbus):
668 def __init__(self, bus):
669 TestDbus.__init__(self, bus)
670 self.success_seen = False
671 self.credentials_received = False
672
673 def __enter__(self):
674 gobject.timeout_add(1, self.start_pin)
675 gobject.timeout_add(15000, self.timeout)
676 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
677 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
678 "Credentials")
679 self.loop.run()
680 return self
681
682 def wpsEvent(self, name, args):
683 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
684 if name == "success":
685 self.success_seen = True
686 if self.credentials_received:
687 self.loop.quit()
688
689 def credentials(self, args):
690 logger.debug("credentials: " + str(args))
691 self.credentials_received = True
692 if self.success_seen:
693 self.loop.quit()
694
695 def start_pin(self, *args):
696 logger.debug("start_pin")
697 bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
698 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
699 'Bssid': bssid_ay})
700 return False
701
702 def success(self):
703 return self.success_seen and self.credentials_received
704
705 with TestDbusWps(bus) as t:
706 if not t.success():
707 raise Exception("Failure in D-Bus operations")
708
709 dev[0].wait_connected(timeout=10)
710
711 def test_dbus_wps_pin2(dev, apdev):
712 """D-Bus WPS/PIN operation and signals (PIN from wpa_supplicant)"""
713 try:
714 _test_dbus_wps_pin2(dev, apdev)
715 finally:
716 dev[0].request("SET wps_cred_processing 0")
717
718 def _test_dbus_wps_pin2(dev, apdev):
719 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
720 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
721
722 hapd = start_ap(apdev[0])
723 bssid = apdev[0]['bssid']
724 dev[0].scan_for_bss(bssid, freq="2412")
725 dev[0].request("SET wps_cred_processing 2")
726
727 class TestDbusWps(TestDbus):
728 def __init__(self, bus):
729 TestDbus.__init__(self, bus)
730 self.success_seen = False
731 self.failed = False
732
733 def __enter__(self):
734 gobject.timeout_add(1, self.start_pin)
735 gobject.timeout_add(15000, self.timeout)
736 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
737 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
738 "Credentials")
739 self.loop.run()
740 return self
741
742 def wpsEvent(self, name, args):
743 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
744 if name == "success":
745 self.success_seen = True
746 if self.credentials_received:
747 self.loop.quit()
748
749 def credentials(self, args):
750 logger.debug("credentials: " + str(args))
751 self.credentials_received = True
752 if self.success_seen:
753 self.loop.quit()
754
755 def start_pin(self, *args):
756 logger.debug("start_pin")
757 bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
758 res = wps.Start({'Role': 'enrollee', 'Type': 'pin',
759 'Bssid': bssid_ay})
760 pin = res['Pin']
761 h = hostapd.Hostapd(apdev[0]['ifname'])
762 h.request("WPS_PIN any " + pin)
763 return False
764
765 def success(self):
766 return self.success_seen and self.credentials_received
767
768 with TestDbusWps(bus) as t:
769 if not t.success():
770 raise Exception("Failure in D-Bus operations")
771
772 dev[0].wait_connected(timeout=10)
773
774 def test_dbus_wps_pin_m2d(dev, apdev):
775 """D-Bus WPS/PIN operation and signals with M2D"""
776 try:
777 _test_dbus_wps_pin_m2d(dev, apdev)
778 finally:
779 dev[0].request("SET wps_cred_processing 0")
780
781 def _test_dbus_wps_pin_m2d(dev, apdev):
782 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
783 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
784
785 hapd = start_ap(apdev[0])
786 bssid = apdev[0]['bssid']
787 dev[0].scan_for_bss(bssid, freq="2412")
788 dev[0].request("SET wps_cred_processing 2")
789
790 class TestDbusWps(TestDbus):
791 def __init__(self, bus):
792 TestDbus.__init__(self, bus)
793 self.success_seen = False
794 self.credentials_received = False
795
796 def __enter__(self):
797 gobject.timeout_add(1, self.start_pin)
798 gobject.timeout_add(15000, self.timeout)
799 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
800 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
801 "Credentials")
802 self.loop.run()
803 return self
804
805 def wpsEvent(self, name, args):
806 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
807 if name == "success":
808 self.success_seen = True
809 if self.credentials_received:
810 self.loop.quit()
811 elif name == "m2d":
812 h = hostapd.Hostapd(apdev[0]['ifname'])
813 h.request("WPS_PIN any 12345670")
814
815 def credentials(self, args):
816 logger.debug("credentials: " + str(args))
817 self.credentials_received = True
818 if self.success_seen:
819 self.loop.quit()
820
821 def start_pin(self, *args):
822 logger.debug("start_pin")
823 bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
824 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
825 'Bssid': bssid_ay})
826 return False
827
828 def success(self):
829 return self.success_seen and self.credentials_received
830
831 with TestDbusWps(bus) as t:
832 if not t.success():
833 raise Exception("Failure in D-Bus operations")
834
835 dev[0].wait_connected(timeout=10)
836
837 def test_dbus_wps_reg(dev, apdev):
838 """D-Bus WPS/Registrar operation and signals"""
839 try:
840 _test_dbus_wps_reg(dev, apdev)
841 finally:
842 dev[0].request("SET wps_cred_processing 0")
843
844 def _test_dbus_wps_reg(dev, apdev):
845 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
846 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
847
848 hapd = start_ap(apdev[0])
849 hapd.request("WPS_PIN any 12345670")
850 bssid = apdev[0]['bssid']
851 dev[0].scan_for_bss(bssid, freq="2412")
852 dev[0].request("SET wps_cred_processing 2")
853
854 class TestDbusWps(TestDbus):
855 def __init__(self, bus):
856 TestDbus.__init__(self, bus)
857 self.credentials_received = False
858
859 def __enter__(self):
860 gobject.timeout_add(100, self.start_reg)
861 gobject.timeout_add(15000, self.timeout)
862 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
863 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
864 "Credentials")
865 self.loop.run()
866 return self
867
868 def wpsEvent(self, name, args):
869 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
870
871 def credentials(self, args):
872 logger.debug("credentials: " + str(args))
873 self.credentials_received = True
874 self.loop.quit()
875
876 def start_reg(self, *args):
877 logger.debug("start_reg")
878 bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
879 wps.Start({'Role': 'registrar', 'Type': 'pin',
880 'Pin': '12345670', 'Bssid': bssid_ay})
881 return False
882
883 def success(self):
884 return self.credentials_received
885
886 with TestDbusWps(bus) as t:
887 if not t.success():
888 raise Exception("Failure in D-Bus operations")
889
890 dev[0].wait_connected(timeout=10)
891
892 def test_dbus_wps_cancel(dev, apdev):
893 """D-Bus WPS Cancel operation"""
894 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
895 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
896
897 hapd = start_ap(apdev[0])
898 bssid = apdev[0]['bssid']
899
900 wps.Cancel()
901 dev[0].scan_for_bss(bssid, freq="2412")
902 bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
903 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
904 'Bssid': bssid_ay})
905 wps.Cancel()
906 dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 1)
907
908 def test_dbus_scan_invalid(dev, apdev):
909 """D-Bus invalid scan method"""
910 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
911 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
912
913 tests = [ ({}, "InvalidArgs"),
914 ({'Type': 123}, "InvalidArgs"),
915 ({'Type': 'foo'}, "InvalidArgs"),
916 ({'Type': 'active', 'Foo': 'bar'}, "InvalidArgs"),
917 ({'Type': 'active', 'SSIDs': 'foo'}, "InvalidArgs"),
918 ({'Type': 'active', 'SSIDs': ['foo']}, "InvalidArgs"),
919 ({'Type': 'active',
920 'SSIDs': [ dbus.ByteArray("1"), dbus.ByteArray("2"),
921 dbus.ByteArray("3"), dbus.ByteArray("4"),
922 dbus.ByteArray("5"), dbus.ByteArray("6"),
923 dbus.ByteArray("7"), dbus.ByteArray("8"),
924 dbus.ByteArray("9"), dbus.ByteArray("10"),
925 dbus.ByteArray("11"), dbus.ByteArray("12"),
926 dbus.ByteArray("13"), dbus.ByteArray("14"),
927 dbus.ByteArray("15"), dbus.ByteArray("16"),
928 dbus.ByteArray("17") ]},
929 "InvalidArgs"),
930 ({'Type': 'active',
931 'SSIDs': [ dbus.ByteArray("1234567890abcdef1234567890abcdef1") ]},
932 "InvalidArgs"),
933 ({'Type': 'active', 'IEs': 'foo'}, "InvalidArgs"),
934 ({'Type': 'active', 'IEs': ['foo']}, "InvalidArgs"),
935 ({'Type': 'active', 'Channels': 2412 }, "InvalidArgs"),
936 ({'Type': 'active', 'Channels': [ 2412 ] }, "InvalidArgs"),
937 ({'Type': 'active',
938 'Channels': [ (dbus.Int32(2412), dbus.UInt32(20)) ] },
939 "InvalidArgs"),
940 ({'Type': 'active',
941 'Channels': [ (dbus.UInt32(2412), dbus.Int32(20)) ] },
942 "InvalidArgs"),
943 ({'Type': 'active', 'AllowRoam': "yes" }, "InvalidArgs"),
944 ({'Type': 'passive', 'IEs': [ dbus.ByteArray("\xdd\x00") ]},
945 "InvalidArgs"),
946 ({'Type': 'passive', 'SSIDs': [ dbus.ByteArray("foo") ]},
947 "InvalidArgs")]
948 for (t,err) in tests:
949 try:
950 iface.Scan(t)
951 raise Exception("Invalid Scan() arguments accepted: " + str(t))
952 except dbus.exceptions.DBusException, e:
953 if err not in str(e):
954 raise Exception("Unexpected error message for invalid Scan(%s): %s" % (str(t), str(e)))
955
956 def test_dbus_scan_oom(dev, apdev):
957 """D-Bus scan method and OOM"""
958 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
959 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
960
961 with alloc_fail_dbus(dev[0], 1,
962 "wpa_scan_clone_params;wpas_dbus_handler_scan",
963 "Scan", expected="ScanError: Scan request rejected"):
964 iface.Scan({ 'Type': 'passive',
965 'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
966
967 with alloc_fail_dbus(dev[0], 1,
968 "=wpas_dbus_get_scan_channels;wpas_dbus_handler_scan",
969 "Scan"):
970 iface.Scan({ 'Type': 'passive',
971 'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
972
973 with alloc_fail_dbus(dev[0], 1,
974 "=wpas_dbus_get_scan_ies;wpas_dbus_handler_scan",
975 "Scan"):
976 iface.Scan({ 'Type': 'active',
977 'IEs': [ dbus.ByteArray("\xdd\x00") ],
978 'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
979
980 with alloc_fail_dbus(dev[0], 1,
981 "=wpas_dbus_get_scan_ssids;wpas_dbus_handler_scan",
982 "Scan"):
983 iface.Scan({ 'Type': 'active',
984 'SSIDs': [ dbus.ByteArray("open"),
985 dbus.ByteArray() ],
986 'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
987
988 def test_dbus_scan(dev, apdev):
989 """D-Bus scan and related signals"""
990 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
991 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
992
993 hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "open" })
994
995 class TestDbusScan(TestDbus):
996 def __init__(self, bus):
997 TestDbus.__init__(self, bus)
998 self.scan_completed = 0
999 self.bss_added = False
1000 self.fail_reason = None
1001
1002 def __enter__(self):
1003 gobject.timeout_add(1, self.run_scan)
1004 gobject.timeout_add(15000, self.timeout)
1005 self.add_signal(self.scanDone, WPAS_DBUS_IFACE, "ScanDone")
1006 self.add_signal(self.bssAdded, WPAS_DBUS_IFACE, "BSSAdded")
1007 self.add_signal(self.bssRemoved, WPAS_DBUS_IFACE, "BSSRemoved")
1008 self.loop.run()
1009 return self
1010
1011 def scanDone(self, success):
1012 logger.debug("scanDone: success=%s" % success)
1013 self.scan_completed += 1
1014 if self.scan_completed == 1:
1015 iface.Scan({'Type': 'passive',
1016 'AllowRoam': True,
1017 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
1018 elif self.scan_completed == 2:
1019 iface.Scan({'Type': 'passive',
1020 'AllowRoam': False})
1021 elif self.bss_added and self.scan_completed == 3:
1022 self.loop.quit()
1023
1024 def bssAdded(self, bss, properties):
1025 logger.debug("bssAdded: %s" % bss)
1026 logger.debug(str(properties))
1027 if 'WPS' in properties:
1028 if 'Type' in properties['WPS']:
1029 self.fail_reason = "Unexpected WPS dictionary entry in non-WPS BSS"
1030 self.loop.quit()
1031 self.bss_added = True
1032 if self.scan_completed == 3:
1033 self.loop.quit()
1034
1035 def bssRemoved(self, bss):
1036 logger.debug("bssRemoved: %s" % bss)
1037
1038 def run_scan(self, *args):
1039 logger.debug("run_scan")
1040 iface.Scan({'Type': 'active',
1041 'SSIDs': [ dbus.ByteArray("open"),
1042 dbus.ByteArray() ],
1043 'IEs': [ dbus.ByteArray("\xdd\x00"),
1044 dbus.ByteArray() ],
1045 'AllowRoam': False,
1046 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
1047 return False
1048
1049 def success(self):
1050 return self.scan_completed == 3 and self.bss_added
1051
1052 with TestDbusScan(bus) as t:
1053 if t.fail_reason:
1054 raise Exception(t.fail_reason)
1055 if not t.success():
1056 raise Exception("Expected signals not seen")
1057
1058 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSs",
1059 dbus_interface=dbus.PROPERTIES_IFACE)
1060 if len(res) < 1:
1061 raise Exception("Scan result not in BSSs property")
1062 iface.FlushBSS(0)
1063 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSs",
1064 dbus_interface=dbus.PROPERTIES_IFACE)
1065 if len(res) != 0:
1066 raise Exception("FlushBSS() did not remove scan results from BSSs property")
1067 iface.FlushBSS(1)
1068
1069 def test_dbus_scan_busy(dev, apdev):
1070 """D-Bus scan trigger rejection when busy with previous scan"""
1071 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1072 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1073
1074 if "OK" not in dev[0].request("SCAN freq=2412-2462"):
1075 raise Exception("Failed to start scan")
1076 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-STARTED"], 15)
1077 if ev is None:
1078 raise Exception("Scan start timed out")
1079
1080 try:
1081 iface.Scan({'Type': 'active', 'AllowRoam': False})
1082 raise Exception("Scan() accepted when busy")
1083 except dbus.exceptions.DBusException, e:
1084 if "ScanError: Scan request reject" not in str(e):
1085 raise Exception("Unexpected error message: " + str(e))
1086
1087 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
1088 if ev is None:
1089 raise Exception("Scan timed out")
1090
1091 def test_dbus_connect(dev, apdev):
1092 """D-Bus AddNetwork and connect"""
1093 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1094 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1095
1096 ssid = "test-wpa2-psk"
1097 passphrase = 'qwertyuiop'
1098 params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
1099 hapd = hostapd.add_ap(apdev[0]['ifname'], params)
1100
1101 class TestDbusConnect(TestDbus):
1102 def __init__(self, bus):
1103 TestDbus.__init__(self, bus)
1104 self.network_added = False
1105 self.network_selected = False
1106 self.network_removed = False
1107 self.state = 0
1108
1109 def __enter__(self):
1110 gobject.timeout_add(1, self.run_connect)
1111 gobject.timeout_add(15000, self.timeout)
1112 self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
1113 self.add_signal(self.networkRemoved, WPAS_DBUS_IFACE,
1114 "NetworkRemoved")
1115 self.add_signal(self.networkSelected, WPAS_DBUS_IFACE,
1116 "NetworkSelected")
1117 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1118 "PropertiesChanged")
1119 self.loop.run()
1120 return self
1121
1122 def networkAdded(self, network, properties):
1123 logger.debug("networkAdded: %s" % str(network))
1124 logger.debug(str(properties))
1125 self.network_added = True
1126
1127 def networkRemoved(self, network):
1128 logger.debug("networkRemoved: %s" % str(network))
1129 self.network_removed = True
1130
1131 def networkSelected(self, network):
1132 logger.debug("networkSelected: %s" % str(network))
1133 self.network_selected = True
1134
1135 def propertiesChanged(self, properties):
1136 logger.debug("propertiesChanged: %s" % str(properties))
1137 if 'State' in properties and properties['State'] == "completed":
1138 if self.state == 0:
1139 self.state = 1
1140 iface.Disconnect()
1141 elif self.state == 2:
1142 self.state = 3
1143 iface.Disconnect()
1144 elif self.state == 4:
1145 self.state = 5
1146 iface.Reattach()
1147 elif self.state == 5:
1148 self.state = 6
1149 iface.Disconnect()
1150 elif self.state == 7:
1151 self.state = 8
1152 res = iface.SignalPoll()
1153 logger.debug("SignalPoll: " + str(res))
1154 if 'frequency' not in res or res['frequency'] != 2412:
1155 self.state = -1
1156 logger.info("Unexpected SignalPoll result")
1157 iface.RemoveNetwork(self.netw)
1158 if 'State' in properties and properties['State'] == "disconnected":
1159 if self.state == 1:
1160 self.state = 2
1161 iface.SelectNetwork(self.netw)
1162 elif self.state == 3:
1163 self.state = 4
1164 iface.Reassociate()
1165 elif self.state == 6:
1166 self.state = 7
1167 iface.Reconnect()
1168 elif self.state == 8:
1169 self.state = 9
1170 self.loop.quit()
1171
1172 def run_connect(self, *args):
1173 logger.debug("run_connect")
1174 args = dbus.Dictionary({ 'ssid': ssid,
1175 'key_mgmt': 'WPA-PSK',
1176 'psk': passphrase,
1177 'scan_freq': 2412 },
1178 signature='sv')
1179 self.netw = iface.AddNetwork(args)
1180 iface.SelectNetwork(self.netw)
1181 return False
1182
1183 def success(self):
1184 if not self.network_added or \
1185 not self.network_removed or \
1186 not self.network_selected:
1187 return False
1188 return self.state == 9
1189
1190 with TestDbusConnect(bus) as t:
1191 if not t.success():
1192 raise Exception("Expected signals not seen")
1193
1194 def test_dbus_connect_psk_mem(dev, apdev):
1195 """D-Bus AddNetwork and connect with memory-only PSK"""
1196 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1197 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1198
1199 ssid = "test-wpa2-psk"
1200 passphrase = 'qwertyuiop'
1201 params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
1202 hapd = hostapd.add_ap(apdev[0]['ifname'], params)
1203
1204 class TestDbusConnect(TestDbus):
1205 def __init__(self, bus):
1206 TestDbus.__init__(self, bus)
1207 self.connected = False
1208
1209 def __enter__(self):
1210 gobject.timeout_add(1, self.run_connect)
1211 gobject.timeout_add(15000, self.timeout)
1212 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1213 "PropertiesChanged")
1214 self.add_signal(self.networkRequest, WPAS_DBUS_IFACE,
1215 "NetworkRequest")
1216 self.loop.run()
1217 return self
1218
1219 def propertiesChanged(self, properties):
1220 logger.debug("propertiesChanged: %s" % str(properties))
1221 if 'State' in properties and properties['State'] == "completed":
1222 self.connected = True
1223 self.loop.quit()
1224
1225 def networkRequest(self, path, field, txt):
1226 logger.debug("networkRequest: %s %s %s" % (path, field, txt))
1227 if field == "PSK_PASSPHRASE":
1228 iface.NetworkReply(path, field, '"' + passphrase + '"')
1229
1230 def run_connect(self, *args):
1231 logger.debug("run_connect")
1232 args = dbus.Dictionary({ 'ssid': ssid,
1233 'key_mgmt': 'WPA-PSK',
1234 'mem_only_psk': 1,
1235 'scan_freq': 2412 },
1236 signature='sv')
1237 self.netw = iface.AddNetwork(args)
1238 iface.SelectNetwork(self.netw)
1239 return False
1240
1241 def success(self):
1242 return self.connected
1243
1244 with TestDbusConnect(bus) as t:
1245 if not t.success():
1246 raise Exception("Expected signals not seen")
1247
1248 def test_dbus_connect_oom(dev, apdev):
1249 """D-Bus AddNetwork and connect when out-of-memory"""
1250 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1251 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1252
1253 if "OK" not in dev[0].request("TEST_ALLOC_FAIL 0:"):
1254 raise HwsimSkip("TEST_ALLOC_FAIL not supported in the build")
1255
1256 ssid = "test-wpa2-psk"
1257 passphrase = 'qwertyuiop'
1258 params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
1259 hapd = hostapd.add_ap(apdev[0]['ifname'], params)
1260
1261 class TestDbusConnect(TestDbus):
1262 def __init__(self, bus):
1263 TestDbus.__init__(self, bus)
1264 self.network_added = False
1265 self.network_selected = False
1266 self.network_removed = False
1267 self.state = 0
1268
1269 def __enter__(self):
1270 gobject.timeout_add(1, self.run_connect)
1271 gobject.timeout_add(1500, self.timeout)
1272 self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
1273 self.add_signal(self.networkRemoved, WPAS_DBUS_IFACE,
1274 "NetworkRemoved")
1275 self.add_signal(self.networkSelected, WPAS_DBUS_IFACE,
1276 "NetworkSelected")
1277 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1278 "PropertiesChanged")
1279 self.loop.run()
1280 return self
1281
1282 def networkAdded(self, network, properties):
1283 logger.debug("networkAdded: %s" % str(network))
1284 logger.debug(str(properties))
1285 self.network_added = True
1286
1287 def networkRemoved(self, network):
1288 logger.debug("networkRemoved: %s" % str(network))
1289 self.network_removed = True
1290
1291 def networkSelected(self, network):
1292 logger.debug("networkSelected: %s" % str(network))
1293 self.network_selected = True
1294
1295 def propertiesChanged(self, properties):
1296 logger.debug("propertiesChanged: %s" % str(properties))
1297 if 'State' in properties and properties['State'] == "completed":
1298 if self.state == 0:
1299 self.state = 1
1300 iface.Disconnect()
1301 elif self.state == 2:
1302 self.state = 3
1303 iface.Disconnect()
1304 elif self.state == 4:
1305 self.state = 5
1306 iface.Reattach()
1307 elif self.state == 5:
1308 self.state = 6
1309 res = iface.SignalPoll()
1310 logger.debug("SignalPoll: " + str(res))
1311 if 'frequency' not in res or res['frequency'] != 2412:
1312 self.state = -1
1313 logger.info("Unexpected SignalPoll result")
1314 iface.RemoveNetwork(self.netw)
1315 if 'State' in properties and properties['State'] == "disconnected":
1316 if self.state == 1:
1317 self.state = 2
1318 iface.SelectNetwork(self.netw)
1319 elif self.state == 3:
1320 self.state = 4
1321 iface.Reassociate()
1322 elif self.state == 6:
1323 self.state = 7
1324 self.loop.quit()
1325
1326 def run_connect(self, *args):
1327 logger.debug("run_connect")
1328 args = dbus.Dictionary({ 'ssid': ssid,
1329 'key_mgmt': 'WPA-PSK',
1330 'psk': passphrase,
1331 'scan_freq': 2412 },
1332 signature='sv')
1333 try:
1334 self.netw = iface.AddNetwork(args)
1335 except Exception, e:
1336 logger.info("Exception on AddNetwork: " + str(e))
1337 self.loop.quit()
1338 return False
1339 try:
1340 iface.SelectNetwork(self.netw)
1341 except Exception, e:
1342 logger.info("Exception on SelectNetwork: " + str(e))
1343 self.loop.quit()
1344
1345 return False
1346
1347 def success(self):
1348 if not self.network_added or \
1349 not self.network_removed or \
1350 not self.network_selected:
1351 return False
1352 return self.state == 7
1353
1354 count = 0
1355 for i in range(1, 1000):
1356 for j in range(3):
1357 dev[j].dump_monitor()
1358 dev[0].request("TEST_ALLOC_FAIL %d:main" % i)
1359 try:
1360 with TestDbusConnect(bus) as t:
1361 if not t.success():
1362 logger.info("Iteration %d - Expected signals not seen" % i)
1363 else:
1364 logger.info("Iteration %d - success" % i)
1365
1366 state = dev[0].request('GET_ALLOC_FAIL')
1367 logger.info("GET_ALLOC_FAIL: " + state)
1368 dev[0].dump_monitor()
1369 dev[0].request("TEST_ALLOC_FAIL 0:")
1370 if i < 3:
1371 raise Exception("Connection succeeded during out-of-memory")
1372 if not state.startswith('0:'):
1373 count += 1
1374 if count == 5:
1375 break
1376 except:
1377 pass
1378
1379 # Force regulatory update to re-fetch hw capabilities for the following
1380 # test cases.
1381 try:
1382 dev[0].dump_monitor()
1383 subprocess.call(['iw', 'reg', 'set', 'US'])
1384 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
1385 finally:
1386 dev[0].dump_monitor()
1387 subprocess.call(['iw', 'reg', 'set', '00'])
1388 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
1389
1390 def test_dbus_while_not_connected(dev, apdev):
1391 """D-Bus invalid operations while not connected"""
1392 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1393 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1394
1395 try:
1396 iface.Disconnect()
1397 raise Exception("Disconnect() accepted when not connected")
1398 except dbus.exceptions.DBusException, e:
1399 if "NotConnected" not in str(e):
1400 raise Exception("Unexpected error message for invalid Disconnect: " + str(e))
1401
1402 try:
1403 iface.Reattach()
1404 raise Exception("Reattach() accepted when not connected")
1405 except dbus.exceptions.DBusException, e:
1406 if "NotConnected" not in str(e):
1407 raise Exception("Unexpected error message for invalid Reattach: " + str(e))
1408
1409 def test_dbus_connect_eap(dev, apdev):
1410 """D-Bus AddNetwork and connect to EAP network"""
1411 check_altsubject_match_support(dev[0])
1412 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1413 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1414
1415 ssid = "ieee8021x-open"
1416 params = hostapd.radius_params()
1417 params["ssid"] = ssid
1418 params["ieee8021x"] = "1"
1419 hapd = hostapd.add_ap(apdev[0]['ifname'], params)
1420
1421 class TestDbusConnect(TestDbus):
1422 def __init__(self, bus):
1423 TestDbus.__init__(self, bus)
1424 self.certification_received = False
1425 self.eap_status = False
1426 self.state = 0
1427
1428 def __enter__(self):
1429 gobject.timeout_add(1, self.run_connect)
1430 gobject.timeout_add(15000, self.timeout)
1431 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1432 "PropertiesChanged")
1433 self.add_signal(self.certification, WPAS_DBUS_IFACE,
1434 "Certification", byte_arrays=True)
1435 self.add_signal(self.networkRequest, WPAS_DBUS_IFACE,
1436 "NetworkRequest")
1437 self.add_signal(self.eap, WPAS_DBUS_IFACE, "EAP")
1438 self.loop.run()
1439 return self
1440
1441 def propertiesChanged(self, properties):
1442 logger.debug("propertiesChanged: %s" % str(properties))
1443 if 'State' in properties and properties['State'] == "completed":
1444 if self.state == 0:
1445 self.state = 1
1446 iface.EAPLogoff()
1447 logger.info("Set dNSName constraint")
1448 net_obj = bus.get_object(WPAS_DBUS_SERVICE, self.netw)
1449 args = dbus.Dictionary({ 'altsubject_match':
1450 self.server_dnsname },
1451 signature='sv')
1452 net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args,
1453 dbus_interface=dbus.PROPERTIES_IFACE)
1454 elif self.state == 2:
1455 self.state = 3
1456 iface.Disconnect()
1457 logger.info("Set non-matching dNSName constraint")
1458 net_obj = bus.get_object(WPAS_DBUS_SERVICE, self.netw)
1459 args = dbus.Dictionary({ 'altsubject_match':
1460 self.server_dnsname + "FOO" },
1461 signature='sv')
1462 net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args,
1463 dbus_interface=dbus.PROPERTIES_IFACE)
1464 if 'State' in properties and properties['State'] == "disconnected":
1465 if self.state == 1:
1466 self.state = 2
1467 iface.EAPLogon()
1468 iface.SelectNetwork(self.netw)
1469 if self.state == 3:
1470 self.state = 4
1471 iface.SelectNetwork(self.netw)
1472
1473 def certification(self, args):
1474 logger.debug("certification: %s" % str(args))
1475 self.certification_received = True
1476 if args['depth'] == 0:
1477 # The test server certificate is supposed to have dNSName
1478 if len(args['altsubject']) < 1:
1479 raise Exception("Missing dNSName")
1480 dnsname = args['altsubject'][0]
1481 if not dnsname.startswith("DNS:"):
1482 raise Exception("Expected dNSName not found: " + dnsname)
1483 logger.info("altsubject: " + dnsname)
1484 self.server_dnsname = dnsname
1485
1486 def eap(self, status, parameter):
1487 logger.debug("EAP: status=%s parameter=%s" % (status, parameter))
1488 if status == 'completion' and parameter == 'success':
1489 self.eap_status = True
1490 if self.state == 4 and status == 'remote certificate verification' and parameter == 'AltSubject mismatch':
1491 self.state = 5
1492 self.loop.quit()
1493
1494 def networkRequest(self, path, field, txt):
1495 logger.debug("networkRequest: %s %s %s" % (path, field, txt))
1496 if field == "PASSWORD":
1497 iface.NetworkReply(path, field, "password")
1498
1499 def run_connect(self, *args):
1500 logger.debug("run_connect")
1501 args = dbus.Dictionary({ 'ssid': ssid,
1502 'key_mgmt': 'IEEE8021X',
1503 'eapol_flags': 0,
1504 'eap': 'TTLS',
1505 'anonymous_identity': 'ttls',
1506 'identity': 'pap user',
1507 'ca_cert': 'auth_serv/ca.pem',
1508 'phase2': 'auth=PAP',
1509 'scan_freq': 2412 },
1510 signature='sv')
1511 self.netw = iface.AddNetwork(args)
1512 iface.SelectNetwork(self.netw)
1513 return False
1514
1515 def success(self):
1516 if not self.eap_status or not self.certification_received:
1517 return False
1518 return self.state == 5
1519
1520 with TestDbusConnect(bus) as t:
1521 if not t.success():
1522 raise Exception("Expected signals not seen")
1523
1524 def test_dbus_network(dev, apdev):
1525 """D-Bus AddNetwork/RemoveNetwork parameters and error cases"""
1526 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1527 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1528
1529 args = dbus.Dictionary({ 'ssid': "foo",
1530 'key_mgmt': 'WPA-PSK',
1531 'psk': "12345678",
1532 'identity': dbus.ByteArray([ 1, 2 ]),
1533 'priority': dbus.Int32(0),
1534 'scan_freq': dbus.UInt32(2412) },
1535 signature='sv')
1536 netw = iface.AddNetwork(args)
1537 id = int(dev[0].list_networks()[0]['id'])
1538 val = dev[0].get_network(id, "scan_freq")
1539 if val != "2412":
1540 raise Exception("Invalid scan_freq value: " + str(val))
1541 iface.RemoveNetwork(netw)
1542
1543 args = dbus.Dictionary({ 'ssid': "foo",
1544 'key_mgmt': 'NONE',
1545 'scan_freq': "2412 2432",
1546 'freq_list': "2412 2417 2432" },
1547 signature='sv')
1548 netw = iface.AddNetwork(args)
1549 id = int(dev[0].list_networks()[0]['id'])
1550 val = dev[0].get_network(id, "scan_freq")
1551 if val != "2412 2432":
1552 raise Exception("Invalid scan_freq value (2): " + str(val))
1553 val = dev[0].get_network(id, "freq_list")
1554 if val != "2412 2417 2432":
1555 raise Exception("Invalid freq_list value: " + str(val))
1556 iface.RemoveNetwork(netw)
1557 try:
1558 iface.RemoveNetwork(netw)
1559 raise Exception("Invalid RemoveNetwork() accepted")
1560 except dbus.exceptions.DBusException, e:
1561 if "NetworkUnknown" not in str(e):
1562 raise Exception("Unexpected error message for invalid RemoveNetwork: " + str(e))
1563 try:
1564 iface.SelectNetwork(netw)
1565 raise Exception("Invalid SelectNetwork() accepted")
1566 except dbus.exceptions.DBusException, e:
1567 if "NetworkUnknown" not in str(e):
1568 raise Exception("Unexpected error message for invalid RemoveNetwork: " + str(e))
1569
1570 args = dbus.Dictionary({ 'ssid': "foo1", 'key_mgmt': 'NONE',
1571 'identity': "testuser", 'scan_freq': '2412' },
1572 signature='sv')
1573 netw1 = iface.AddNetwork(args)
1574 args = dbus.Dictionary({ 'ssid': "foo2", 'key_mgmt': 'NONE' },
1575 signature='sv')
1576 netw2 = iface.AddNetwork(args)
1577 res = if_obj.Get(WPAS_DBUS_IFACE, "Networks",
1578 dbus_interface=dbus.PROPERTIES_IFACE)
1579 if len(res) != 2:
1580 raise Exception("Unexpected number of networks")
1581
1582 net_obj = bus.get_object(WPAS_DBUS_SERVICE, netw1)
1583 res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled",
1584 dbus_interface=dbus.PROPERTIES_IFACE)
1585 if res != False:
1586 raise Exception("Added network was unexpectedly enabled by default")
1587 net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.Boolean(True),
1588 dbus_interface=dbus.PROPERTIES_IFACE)
1589 res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled",
1590 dbus_interface=dbus.PROPERTIES_IFACE)
1591 if res != True:
1592 raise Exception("Set(Enabled,True) did not seem to change property value")
1593 net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.Boolean(False),
1594 dbus_interface=dbus.PROPERTIES_IFACE)
1595 res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled",
1596 dbus_interface=dbus.PROPERTIES_IFACE)
1597 if res != False:
1598 raise Exception("Set(Enabled,False) did not seem to change property value")
1599 try:
1600 net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.UInt32(1),
1601 dbus_interface=dbus.PROPERTIES_IFACE)
1602 raise Exception("Invalid Set(Enabled,1) accepted")
1603 except dbus.exceptions.DBusException, e:
1604 if "Error.Failed: wrong property type" not in str(e):
1605 raise Exception("Unexpected error message for invalid Set(Enabled,1): " + str(e))
1606
1607 args = dbus.Dictionary({ 'ssid': "foo1new" }, signature='sv')
1608 net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args,
1609 dbus_interface=dbus.PROPERTIES_IFACE)
1610 res = net_obj.Get(WPAS_DBUS_NETWORK, "Properties",
1611 dbus_interface=dbus.PROPERTIES_IFACE)
1612 if res['ssid'] != '"foo1new"':
1613 raise Exception("Set(Properties) failed to update ssid")
1614 if res['identity'] != '"testuser"':
1615 raise Exception("Set(Properties) unexpectedly changed unrelated parameter")
1616
1617 iface.RemoveAllNetworks()
1618 res = if_obj.Get(WPAS_DBUS_IFACE, "Networks",
1619 dbus_interface=dbus.PROPERTIES_IFACE)
1620 if len(res) != 0:
1621 raise Exception("Unexpected number of networks")
1622 iface.RemoveAllNetworks()
1623
1624 tests = [ dbus.Dictionary({ 'psk': "1234567" }, signature='sv'),
1625 dbus.Dictionary({ 'identity': dbus.ByteArray() },
1626 signature='sv'),
1627 dbus.Dictionary({ 'identity': dbus.Byte(1) }, signature='sv'),
1628 dbus.Dictionary({ 'identity': "" }, signature='sv') ]
1629 for args in tests:
1630 try:
1631 iface.AddNetwork(args)
1632 raise Exception("Invalid AddNetwork args accepted: " + str(args))
1633 except dbus.exceptions.DBusException, e:
1634 if "InvalidArgs" not in str(e):
1635 raise Exception("Unexpected error message for invalid AddNetwork: " + str(e))
1636
1637 def test_dbus_network_oom(dev, apdev):
1638 """D-Bus AddNetwork/RemoveNetwork parameters and OOM error cases"""
1639 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1640 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1641
1642 args = dbus.Dictionary({ 'ssid': "foo1", 'key_mgmt': 'NONE',
1643 'identity': "testuser", 'scan_freq': '2412' },
1644 signature='sv')
1645 netw1 = iface.AddNetwork(args)
1646 net_obj = bus.get_object(WPAS_DBUS_SERVICE, netw1)
1647
1648 with alloc_fail_dbus(dev[0], 1,
1649 "wpa_config_get_all;wpas_dbus_getter_network_properties",
1650 "Get"):
1651 net_obj.Get(WPAS_DBUS_NETWORK, "Properties",
1652 dbus_interface=dbus.PROPERTIES_IFACE)
1653
1654 iface.RemoveAllNetworks()
1655
1656 with alloc_fail_dbus(dev[0], 1,
1657 "wpas_dbus_new_decompose_object_path;wpas_dbus_handler_remove_network",
1658 "RemoveNetwork", "InvalidArgs"):
1659 iface.RemoveNetwork(dbus.ObjectPath("/fi/w1/wpa_supplicant1/Interfaces/1234/Networks/1234"))
1660
1661 with alloc_fail(dev[0], 1, "wpa_dbus_register_object_per_iface;wpas_dbus_register_network"):
1662 args = dbus.Dictionary({ 'ssid': "foo2", 'key_mgmt': 'NONE' },
1663 signature='sv')
1664 try:
1665 netw = iface.AddNetwork(args)
1666 # Currently, AddNetwork() succeeds even if os_strdup() for path
1667 # fails, so remove the network if that occurs.
1668 iface.RemoveNetwork(netw)
1669 except dbus.exceptions.DBusException, e:
1670 pass
1671
1672 for i in range(1, 3):
1673 with alloc_fail(dev[0], i, "=wpas_dbus_register_network"):
1674 try:
1675 netw = iface.AddNetwork(args)
1676 # Currently, AddNetwork() succeeds even if network registration
1677 # fails, so remove the network if that occurs.
1678 iface.RemoveNetwork(netw)
1679 except dbus.exceptions.DBusException, e:
1680 pass
1681
1682 with alloc_fail_dbus(dev[0], 1,
1683 "=wpa_config_add_network;wpas_dbus_handler_add_network",
1684 "AddNetwork",
1685 "UnknownError: wpa_supplicant could not add a network"):
1686 args = dbus.Dictionary({ 'ssid': "foo2", 'key_mgmt': 'NONE' },
1687 signature='sv')
1688 netw = iface.AddNetwork(args)
1689
1690 tests = [ (1,
1691 'wpa_dbus_dict_get_entry;set_network_properties;wpas_dbus_handler_add_network',
1692 dbus.Dictionary({ 'ssid': dbus.ByteArray(' ') },
1693 signature='sv')),
1694 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1695 dbus.Dictionary({ 'ssid': 'foo' }, signature='sv')),
1696 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1697 dbus.Dictionary({ 'eap': 'foo' }, signature='sv')),
1698 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1699 dbus.Dictionary({ 'priority': dbus.UInt32(1) },
1700 signature='sv')),
1701 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1702 dbus.Dictionary({ 'priority': dbus.Int32(1) },
1703 signature='sv')),
1704 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1705 dbus.Dictionary({ 'ssid': dbus.ByteArray(' ') },
1706 signature='sv')) ]
1707 for (count,funcs,args) in tests:
1708 with alloc_fail_dbus(dev[0], count, funcs, "AddNetwork", "InvalidArgs"):
1709 netw = iface.AddNetwork(args)
1710
1711 if len(if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
1712 dbus_interface=dbus.PROPERTIES_IFACE)) > 0:
1713 raise Exception("Unexpected network block added")
1714 if len(dev[0].list_networks()) > 0:
1715 raise Exception("Unexpected network block visible")
1716
1717 def test_dbus_interface(dev, apdev):
1718 """D-Bus CreateInterface/GetInterface/RemoveInterface parameters and error cases"""
1719 try:
1720 _test_dbus_interface(dev, apdev)
1721 finally:
1722 # Need to force P2P channel list update since the 'lo' interface
1723 # with driver=none ends up configuring default dualband channels.
1724 dev[0].request("SET country US")
1725 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
1726 if ev is None:
1727 ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"],
1728 timeout=1)
1729 dev[0].request("SET country 00")
1730 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
1731 if ev is None:
1732 ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"],
1733 timeout=1)
1734 subprocess.call(['iw', 'reg', 'set', '00'])
1735
1736 def _test_dbus_interface(dev, apdev):
1737 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1738 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
1739
1740 params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none' },
1741 signature='sv')
1742 path = wpas.CreateInterface(params)
1743 logger.debug("New interface path: " + str(path))
1744 path2 = wpas.GetInterface("lo")
1745 if path != path2:
1746 raise Exception("Interface object mismatch")
1747
1748 params = dbus.Dictionary({ 'Ifname': 'lo',
1749 'Driver': 'none',
1750 'ConfigFile': 'foo',
1751 'BridgeIfname': 'foo', },
1752 signature='sv')
1753 try:
1754 wpas.CreateInterface(params)
1755 raise Exception("Invalid CreateInterface() accepted")
1756 except dbus.exceptions.DBusException, e:
1757 if "InterfaceExists" not in str(e):
1758 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e))
1759
1760 wpas.RemoveInterface(path)
1761 try:
1762 wpas.RemoveInterface(path)
1763 raise Exception("Invalid RemoveInterface() accepted")
1764 except dbus.exceptions.DBusException, e:
1765 if "InterfaceUnknown" not in str(e):
1766 raise Exception("Unexpected error message for invalid RemoveInterface: " + str(e))
1767
1768 params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none',
1769 'Foo': 123 },
1770 signature='sv')
1771 try:
1772 wpas.CreateInterface(params)
1773 raise Exception("Invalid CreateInterface() accepted")
1774 except dbus.exceptions.DBusException, e:
1775 if "InvalidArgs" not in str(e):
1776 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e))
1777
1778 params = dbus.Dictionary({ 'Driver': 'none' }, signature='sv')
1779 try:
1780 wpas.CreateInterface(params)
1781 raise Exception("Invalid CreateInterface() accepted")
1782 except dbus.exceptions.DBusException, e:
1783 if "InvalidArgs" not in str(e):
1784 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e))
1785
1786 try:
1787 wpas.GetInterface("lo")
1788 raise Exception("Invalid GetInterface() accepted")
1789 except dbus.exceptions.DBusException, e:
1790 if "InterfaceUnknown" not in str(e):
1791 raise Exception("Unexpected error message for invalid RemoveInterface: " + str(e))
1792
1793 def test_dbus_interface_oom(dev, apdev):
1794 """D-Bus CreateInterface/GetInterface/RemoveInterface OOM error cases"""
1795 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1796 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
1797
1798 with alloc_fail_dbus(dev[0], 1, "wpa_dbus_dict_get_entry;wpas_dbus_handler_create_interface", "CreateInterface", "InvalidArgs"):
1799 params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none' },
1800 signature='sv')
1801 wpas.CreateInterface(params)
1802
1803 for i in range(1, 1000):
1804 dev[0].request("TEST_ALLOC_FAIL %d:wpa_supplicant_add_iface;wpas_dbus_handler_create_interface" % i)
1805 params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none' },
1806 signature='sv')
1807 try:
1808 npath = wpas.CreateInterface(params)
1809 wpas.RemoveInterface(npath)
1810 logger.info("CreateInterface succeeds after %d allocation failures" % i)
1811 state = dev[0].request('GET_ALLOC_FAIL')
1812 logger.info("GET_ALLOC_FAIL: " + state)
1813 dev[0].dump_monitor()
1814 dev[0].request("TEST_ALLOC_FAIL 0:")
1815 if i < 5:
1816 raise Exception("CreateInterface succeeded during out-of-memory")
1817 if not state.startswith('0:'):
1818 break
1819 except dbus.exceptions.DBusException, e:
1820 pass
1821
1822 for arg in [ 'Driver', 'Ifname', 'ConfigFile', 'BridgeIfname' ]:
1823 with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_handler_create_interface",
1824 "CreateInterface"):
1825 params = dbus.Dictionary({ arg: 'foo' }, signature='sv')
1826 wpas.CreateInterface(params)
1827
1828 def test_dbus_blob(dev, apdev):
1829 """D-Bus AddNetwork/RemoveNetwork parameters and error cases"""
1830 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1831 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1832
1833 blob = dbus.ByteArray("\x01\x02\x03")
1834 iface.AddBlob('blob1', blob)
1835 try:
1836 iface.AddBlob('blob1', dbus.ByteArray("\x01\x02\x04"))
1837 raise Exception("Invalid AddBlob() accepted")
1838 except dbus.exceptions.DBusException, e:
1839 if "BlobExists" not in str(e):
1840 raise Exception("Unexpected error message for invalid AddBlob: " + str(e))
1841 res = iface.GetBlob('blob1')
1842 if len(res) != len(blob):
1843 raise Exception("Unexpected blob data length")
1844 for i in range(len(res)):
1845 if res[i] != dbus.Byte(blob[i]):
1846 raise Exception("Unexpected blob data")
1847 res = if_obj.Get(WPAS_DBUS_IFACE, "Blobs",
1848 dbus_interface=dbus.PROPERTIES_IFACE)
1849 if 'blob1' not in res:
1850 raise Exception("Added blob missing from Blobs property")
1851 iface.RemoveBlob('blob1')
1852 try:
1853 iface.RemoveBlob('blob1')
1854 raise Exception("Invalid RemoveBlob() accepted")
1855 except dbus.exceptions.DBusException, e:
1856 if "BlobUnknown" not in str(e):
1857 raise Exception("Unexpected error message for invalid RemoveBlob: " + str(e))
1858 try:
1859 iface.GetBlob('blob1')
1860 raise Exception("Invalid GetBlob() accepted")
1861 except dbus.exceptions.DBusException, e:
1862 if "BlobUnknown" not in str(e):
1863 raise Exception("Unexpected error message for invalid GetBlob: " + str(e))
1864
1865 class TestDbusBlob(TestDbus):
1866 def __init__(self, bus):
1867 TestDbus.__init__(self, bus)
1868 self.blob_added = False
1869 self.blob_removed = False
1870
1871 def __enter__(self):
1872 gobject.timeout_add(1, self.run_blob)
1873 gobject.timeout_add(15000, self.timeout)
1874 self.add_signal(self.blobAdded, WPAS_DBUS_IFACE, "BlobAdded")
1875 self.add_signal(self.blobRemoved, WPAS_DBUS_IFACE, "BlobRemoved")
1876 self.loop.run()
1877 return self
1878
1879 def blobAdded(self, blobName):
1880 logger.debug("blobAdded: %s" % blobName)
1881 if blobName == 'blob2':
1882 self.blob_added = True
1883
1884 def blobRemoved(self, blobName):
1885 logger.debug("blobRemoved: %s" % blobName)
1886 if blobName == 'blob2':
1887 self.blob_removed = True
1888 self.loop.quit()
1889
1890 def run_blob(self, *args):
1891 logger.debug("run_blob")
1892 iface.AddBlob('blob2', dbus.ByteArray("\x01\x02\x04"))
1893 iface.RemoveBlob('blob2')
1894 return False
1895
1896 def success(self):
1897 return self.blob_added and self.blob_removed
1898
1899 with TestDbusBlob(bus) as t:
1900 if not t.success():
1901 raise Exception("Expected signals not seen")
1902
1903 def test_dbus_blob_oom(dev, apdev):
1904 """D-Bus AddNetwork/RemoveNetwork OOM error cases"""
1905 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1906 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1907
1908 for i in range(1, 4):
1909 with alloc_fail_dbus(dev[0], i, "wpas_dbus_handler_add_blob",
1910 "AddBlob"):
1911 iface.AddBlob('blob_no_mem', dbus.ByteArray("\x01\x02\x03\x04"))
1912
1913 def test_dbus_autoscan(dev, apdev):
1914 """D-Bus Autoscan()"""
1915 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1916 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1917
1918 iface.AutoScan("foo")
1919 iface.AutoScan("periodic:1")
1920 iface.AutoScan("")
1921 dev[0].request("AUTOSCAN ")
1922
1923 def test_dbus_autoscan_oom(dev, apdev):
1924 """D-Bus Autoscan() OOM"""
1925 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1926 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1927
1928 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_handler_autoscan", "AutoScan"):
1929 iface.AutoScan("foo")
1930 dev[0].request("AUTOSCAN ")
1931
1932 def test_dbus_tdls_invalid(dev, apdev):
1933 """D-Bus invalid TDLS operations"""
1934 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1935 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1936
1937 hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "test-open" })
1938 connect_2sta_open(dev, hapd)
1939 addr1 = dev[1].p2p_interface_addr()
1940
1941 try:
1942 iface.TDLSDiscover("foo")
1943 raise Exception("Invalid TDLSDiscover() accepted")
1944 except dbus.exceptions.DBusException, e:
1945 if "InvalidArgs" not in str(e):
1946 raise Exception("Unexpected error message for invalid TDLSDiscover: " + str(e))
1947
1948 try:
1949 iface.TDLSStatus("foo")
1950 raise Exception("Invalid TDLSStatus() accepted")
1951 except dbus.exceptions.DBusException, e:
1952 if "InvalidArgs" not in str(e):
1953 raise Exception("Unexpected error message for invalid TDLSStatus: " + str(e))
1954
1955 res = iface.TDLSStatus(addr1)
1956 if res != "peer does not exist":
1957 raise Exception("Unexpected TDLSStatus response")
1958
1959 try:
1960 iface.TDLSSetup("foo")
1961 raise Exception("Invalid TDLSSetup() accepted")
1962 except dbus.exceptions.DBusException, e:
1963 if "InvalidArgs" not in str(e):
1964 raise Exception("Unexpected error message for invalid TDLSSetup: " + str(e))
1965
1966 try:
1967 iface.TDLSTeardown("foo")
1968 raise Exception("Invalid TDLSTeardown() accepted")
1969 except dbus.exceptions.DBusException, e:
1970 if "InvalidArgs" not in str(e):
1971 raise Exception("Unexpected error message for invalid TDLSTeardown: " + str(e))
1972
1973 try:
1974 iface.TDLSTeardown("00:11:22:33:44:55")
1975 raise Exception("TDLSTeardown accepted for unknown peer")
1976 except dbus.exceptions.DBusException, e:
1977 if "UnknownError: error performing TDLS teardown" not in str(e):
1978 raise Exception("Unexpected error message: " + str(e))
1979
1980 def test_dbus_tdls_oom(dev, apdev):
1981 """D-Bus TDLS operations during OOM"""
1982 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1983 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1984
1985 with alloc_fail_dbus(dev[0], 1, "wpa_tdls_add_peer", "TDLSSetup",
1986 "UnknownError: error performing TDLS setup"):
1987 iface.TDLSSetup("00:11:22:33:44:55")
1988
1989 def test_dbus_tdls(dev, apdev):
1990 """D-Bus TDLS"""
1991 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1992 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1993
1994 hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "test-open" })
1995 connect_2sta_open(dev, hapd)
1996
1997 addr1 = dev[1].p2p_interface_addr()
1998
1999 class TestDbusTdls(TestDbus):
2000 def __init__(self, bus):
2001 TestDbus.__init__(self, bus)
2002 self.tdls_setup = False
2003 self.tdls_teardown = False
2004
2005 def __enter__(self):
2006 gobject.timeout_add(1, self.run_tdls)
2007 gobject.timeout_add(15000, self.timeout)
2008 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
2009 "PropertiesChanged")
2010 self.loop.run()
2011 return self
2012
2013 def propertiesChanged(self, properties):
2014 logger.debug("propertiesChanged: %s" % str(properties))
2015
2016 def run_tdls(self, *args):
2017 logger.debug("run_tdls")
2018 iface.TDLSDiscover(addr1)
2019 gobject.timeout_add(100, self.run_tdls2)
2020 return False
2021
2022 def run_tdls2(self, *args):
2023 logger.debug("run_tdls2")
2024 iface.TDLSSetup(addr1)
2025 gobject.timeout_add(500, self.run_tdls3)
2026 return False
2027
2028 def run_tdls3(self, *args):
2029 logger.debug("run_tdls3")
2030 res = iface.TDLSStatus(addr1)
2031 if res == "connected":
2032 self.tdls_setup = True
2033 else:
2034 logger.info("Unexpected TDLSStatus: " + res)
2035 iface.TDLSTeardown(addr1)
2036 gobject.timeout_add(200, self.run_tdls4)
2037 return False
2038
2039 def run_tdls4(self, *args):
2040 logger.debug("run_tdls4")
2041 res = iface.TDLSStatus(addr1)
2042 if res == "peer does not exist":
2043 self.tdls_teardown = True
2044 else:
2045 logger.info("Unexpected TDLSStatus: " + res)
2046 self.loop.quit()
2047 return False
2048
2049 def success(self):
2050 return self.tdls_setup and self.tdls_teardown
2051
2052 with TestDbusTdls(bus) as t:
2053 if not t.success():
2054 raise Exception("Expected signals not seen")
2055
2056 def test_dbus_pkcs11(dev, apdev):
2057 """D-Bus SetPKCS11EngineAndModulePath()"""
2058 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2059 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2060
2061 try:
2062 iface.SetPKCS11EngineAndModulePath("foo", "bar")
2063 except dbus.exceptions.DBusException, e:
2064 if "Error.Failed: Reinit of the EAPOL" not in str(e):
2065 raise Exception("Unexpected error message for invalid SetPKCS11EngineAndModulePath: " + str(e))
2066
2067 try:
2068 iface.SetPKCS11EngineAndModulePath("foo", "")
2069 except dbus.exceptions.DBusException, e:
2070 if "Error.Failed: Reinit of the EAPOL" not in str(e):
2071 raise Exception("Unexpected error message for invalid SetPKCS11EngineAndModulePath: " + str(e))
2072
2073 iface.SetPKCS11EngineAndModulePath("", "bar")
2074 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11EnginePath",
2075 dbus_interface=dbus.PROPERTIES_IFACE)
2076 if res != "":
2077 raise Exception("Unexpected PKCS11EnginePath value: " + res)
2078 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11ModulePath",
2079 dbus_interface=dbus.PROPERTIES_IFACE)
2080 if res != "bar":
2081 raise Exception("Unexpected PKCS11ModulePath value: " + res)
2082
2083 iface.SetPKCS11EngineAndModulePath("", "")
2084 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11EnginePath",
2085 dbus_interface=dbus.PROPERTIES_IFACE)
2086 if res != "":
2087 raise Exception("Unexpected PKCS11EnginePath value: " + res)
2088 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11ModulePath",
2089 dbus_interface=dbus.PROPERTIES_IFACE)
2090 if res != "":
2091 raise Exception("Unexpected PKCS11ModulePath value: " + res)
2092
2093 def test_dbus_apscan(dev, apdev):
2094 """D-Bus Get/Set ApScan"""
2095 try:
2096 _test_dbus_apscan(dev, apdev)
2097 finally:
2098 dev[0].request("AP_SCAN 1")
2099
2100 def _test_dbus_apscan(dev, apdev):
2101 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2102
2103 res = if_obj.Get(WPAS_DBUS_IFACE, "ApScan",
2104 dbus_interface=dbus.PROPERTIES_IFACE)
2105 if res != 1:
2106 raise Exception("Unexpected initial ApScan value: %d" % res)
2107
2108 for i in range(3):
2109 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(i),
2110 dbus_interface=dbus.PROPERTIES_IFACE)
2111 res = if_obj.Get(WPAS_DBUS_IFACE, "ApScan",
2112 dbus_interface=dbus.PROPERTIES_IFACE)
2113 if res != i:
2114 raise Exception("Unexpected ApScan value %d (expected %d)" % (res, i))
2115
2116 try:
2117 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.Int16(-1),
2118 dbus_interface=dbus.PROPERTIES_IFACE)
2119 raise Exception("Invalid Set(ApScan,-1) accepted")
2120 except dbus.exceptions.DBusException, e:
2121 if "Error.Failed: wrong property type" not in str(e):
2122 raise Exception("Unexpected error message for invalid Set(ApScan,-1): " + str(e))
2123
2124 try:
2125 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(123),
2126 dbus_interface=dbus.PROPERTIES_IFACE)
2127 raise Exception("Invalid Set(ApScan,123) accepted")
2128 except dbus.exceptions.DBusException, e:
2129 if "Error.Failed: ap_scan must be 0, 1, or 2" not in str(e):
2130 raise Exception("Unexpected error message for invalid Set(ApScan,123): " + str(e))
2131
2132 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(1),
2133 dbus_interface=dbus.PROPERTIES_IFACE)
2134
2135 def test_dbus_fastreauth(dev, apdev):
2136 """D-Bus Get/Set FastReauth"""
2137 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2138
2139 res = if_obj.Get(WPAS_DBUS_IFACE, "FastReauth",
2140 dbus_interface=dbus.PROPERTIES_IFACE)
2141 if res != True:
2142 raise Exception("Unexpected initial FastReauth value: " + str(res))
2143
2144 for i in [ False, True ]:
2145 if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Boolean(i),
2146 dbus_interface=dbus.PROPERTIES_IFACE)
2147 res = if_obj.Get(WPAS_DBUS_IFACE, "FastReauth",
2148 dbus_interface=dbus.PROPERTIES_IFACE)
2149 if res != i:
2150 raise Exception("Unexpected FastReauth value %d (expected %d)" % (res, i))
2151
2152 try:
2153 if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Int16(-1),
2154 dbus_interface=dbus.PROPERTIES_IFACE)
2155 raise Exception("Invalid Set(FastReauth,-1) accepted")
2156 except dbus.exceptions.DBusException, e:
2157 if "Error.Failed: wrong property type" not in str(e):
2158 raise Exception("Unexpected error message for invalid Set(ApScan,-1): " + str(e))
2159
2160 if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Boolean(True),
2161 dbus_interface=dbus.PROPERTIES_IFACE)
2162
2163 def test_dbus_bss_expire(dev, apdev):
2164 """D-Bus Get/Set BSSExpireAge and BSSExpireCount"""
2165 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2166
2167 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(179),
2168 dbus_interface=dbus.PROPERTIES_IFACE)
2169 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSExpireAge",
2170 dbus_interface=dbus.PROPERTIES_IFACE)
2171 if res != 179:
2172 raise Exception("Unexpected BSSExpireAge value %d (expected %d)" % (res, i))
2173
2174 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(3),
2175 dbus_interface=dbus.PROPERTIES_IFACE)
2176 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSExpireCount",
2177 dbus_interface=dbus.PROPERTIES_IFACE)
2178 if res != 3:
2179 raise Exception("Unexpected BSSExpireCount value %d (expected %d)" % (res, i))
2180
2181 try:
2182 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.Int16(-1),
2183 dbus_interface=dbus.PROPERTIES_IFACE)
2184 raise Exception("Invalid Set(BSSExpireAge,-1) accepted")
2185 except dbus.exceptions.DBusException, e:
2186 if "Error.Failed: wrong property type" not in str(e):
2187 raise Exception("Unexpected error message for invalid Set(BSSExpireAge,-1): " + str(e))
2188
2189 try:
2190 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(9),
2191 dbus_interface=dbus.PROPERTIES_IFACE)
2192 raise Exception("Invalid Set(BSSExpireAge,9) accepted")
2193 except dbus.exceptions.DBusException, e:
2194 if "Error.Failed: BSSExpireAge must be >= 10" not in str(e):
2195 raise Exception("Unexpected error message for invalid Set(BSSExpireAge,9): " + str(e))
2196
2197 try:
2198 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.Int16(-1),
2199 dbus_interface=dbus.PROPERTIES_IFACE)
2200 raise Exception("Invalid Set(BSSExpireCount,-1) accepted")
2201 except dbus.exceptions.DBusException, e:
2202 if "Error.Failed: wrong property type" not in str(e):
2203 raise Exception("Unexpected error message for invalid Set(BSSExpireCount,-1): " + str(e))
2204
2205 try:
2206 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(0),
2207 dbus_interface=dbus.PROPERTIES_IFACE)
2208 raise Exception("Invalid Set(BSSExpireCount,0) accepted")
2209 except dbus.exceptions.DBusException, e:
2210 if "Error.Failed: BSSExpireCount must be > 0" not in str(e):
2211 raise Exception("Unexpected error message for invalid Set(BSSExpireCount,0): " + str(e))
2212
2213 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(180),
2214 dbus_interface=dbus.PROPERTIES_IFACE)
2215 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(2),
2216 dbus_interface=dbus.PROPERTIES_IFACE)
2217
2218 def test_dbus_country(dev, apdev):
2219 """D-Bus Get/Set Country"""
2220 try:
2221 _test_dbus_country(dev, apdev)
2222 finally:
2223 dev[0].request("SET country 00")
2224 subprocess.call(['iw', 'reg', 'set', '00'])
2225
2226 def _test_dbus_country(dev, apdev):
2227 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2228
2229 # work around issues with possible pending regdom event from the end of
2230 # the previous test case
2231 time.sleep(0.2)
2232 dev[0].dump_monitor()
2233
2234 if_obj.Set(WPAS_DBUS_IFACE, "Country", "FI",
2235 dbus_interface=dbus.PROPERTIES_IFACE)
2236 res = if_obj.Get(WPAS_DBUS_IFACE, "Country",
2237 dbus_interface=dbus.PROPERTIES_IFACE)
2238 if res != "FI":
2239 raise Exception("Unexpected Country value %s (expected FI)" % res)
2240
2241 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"])
2242 if ev is None:
2243 # For now, work around separate P2P Device interface event delivery
2244 ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
2245 if ev is None:
2246 raise Exception("regdom change event not seen")
2247 if "init=USER type=COUNTRY alpha2=FI" not in ev:
2248 raise Exception("Unexpected event contents: " + ev)
2249
2250 try:
2251 if_obj.Set(WPAS_DBUS_IFACE, "Country", dbus.Int16(-1),
2252 dbus_interface=dbus.PROPERTIES_IFACE)
2253 raise Exception("Invalid Set(Country,-1) accepted")
2254 except dbus.exceptions.DBusException, e:
2255 if "Error.Failed: wrong property type" not in str(e):
2256 raise Exception("Unexpected error message for invalid Set(Country,-1): " + str(e))
2257
2258 try:
2259 if_obj.Set(WPAS_DBUS_IFACE, "Country", "F",
2260 dbus_interface=dbus.PROPERTIES_IFACE)
2261 raise Exception("Invalid Set(Country,F) accepted")
2262 except dbus.exceptions.DBusException, e:
2263 if "Error.Failed: invalid country code" not in str(e):
2264 raise Exception("Unexpected error message for invalid Set(Country,F): " + str(e))
2265
2266 if_obj.Set(WPAS_DBUS_IFACE, "Country", "00",
2267 dbus_interface=dbus.PROPERTIES_IFACE)
2268
2269 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"])
2270 if ev is None:
2271 # For now, work around separate P2P Device interface event delivery
2272 ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
2273 if ev is None:
2274 raise Exception("regdom change event not seen")
2275 if "init=CORE type=WORLD" not in ev:
2276 raise Exception("Unexpected event contents: " + ev)
2277
2278 def test_dbus_scan_interval(dev, apdev):
2279 """D-Bus Get/Set ScanInterval"""
2280 try:
2281 _test_dbus_scan_interval(dev, apdev)
2282 finally:
2283 dev[0].request("SCAN_INTERVAL 5")
2284
2285 def _test_dbus_scan_interval(dev, apdev):
2286 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2287
2288 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(3),
2289 dbus_interface=dbus.PROPERTIES_IFACE)
2290 res = if_obj.Get(WPAS_DBUS_IFACE, "ScanInterval",
2291 dbus_interface=dbus.PROPERTIES_IFACE)
2292 if res != 3:
2293 raise Exception("Unexpected ScanInterval value %d (expected %d)" % (res, i))
2294
2295 try:
2296 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.UInt16(100),
2297 dbus_interface=dbus.PROPERTIES_IFACE)
2298 raise Exception("Invalid Set(ScanInterval,100) accepted")
2299 except dbus.exceptions.DBusException, e:
2300 if "Error.Failed: wrong property type" not in str(e):
2301 raise Exception("Unexpected error message for invalid Set(ScanInterval,100): " + str(e))
2302
2303 try:
2304 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(-1),
2305 dbus_interface=dbus.PROPERTIES_IFACE)
2306 raise Exception("Invalid Set(ScanInterval,-1) accepted")
2307 except dbus.exceptions.DBusException, e:
2308 if "Error.Failed: scan_interval must be >= 0" not in str(e):
2309 raise Exception("Unexpected error message for invalid Set(ScanInterval,-1): " + str(e))
2310
2311 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(5),
2312 dbus_interface=dbus.PROPERTIES_IFACE)
2313
2314 def test_dbus_probe_req_reporting(dev, apdev):
2315 """D-Bus Probe Request reporting"""
2316 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2317
2318 dev[1].p2p_find(social=True)
2319
2320 class TestDbusProbe(TestDbus):
2321 def __init__(self, bus):
2322 TestDbus.__init__(self, bus)
2323 self.reported = False
2324
2325 def __enter__(self):
2326 gobject.timeout_add(1, self.run_test)
2327 gobject.timeout_add(15000, self.timeout)
2328 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
2329 "GroupStarted")
2330 self.add_signal(self.probeRequest, WPAS_DBUS_IFACE, "ProbeRequest",
2331 byte_arrays=True)
2332 self.loop.run()
2333 return self
2334
2335 def groupStarted(self, properties):
2336 logger.debug("groupStarted: " + str(properties))
2337 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
2338 properties['interface_object'])
2339 self.iface = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE)
2340 self.iface.SubscribeProbeReq()
2341 self.group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2342
2343 def probeRequest(self, args):
2344 logger.debug("probeRequest: args=%s" % str(args))
2345 self.reported = True
2346 self.loop.quit()
2347
2348 def run_test(self, *args):
2349 logger.debug("run_test")
2350 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2351 params = dbus.Dictionary({ 'frequency': 2412 })
2352 p2p.GroupAdd(params)
2353 return False
2354
2355 def success(self):
2356 return self.reported
2357
2358 with TestDbusProbe(bus) as t:
2359 if not t.success():
2360 raise Exception("Expected signals not seen")
2361 t.iface.UnsubscribeProbeReq()
2362 try:
2363 t.iface.UnsubscribeProbeReq()
2364 raise Exception("Invalid UnsubscribeProbeReq() accepted")
2365 except dbus.exceptions.DBusException, e:
2366 if "NoSubscription" not in str(e):
2367 raise Exception("Unexpected error message for invalid UnsubscribeProbeReq(): " + str(e))
2368 t.group_p2p.Disconnect()
2369
2370 with TestDbusProbe(bus) as t:
2371 if not t.success():
2372 raise Exception("Expected signals not seen")
2373 # On purpose, leave ProbeReq subscription in place to test automatic
2374 # cleanup.
2375
2376 dev[1].p2p_stop_find()
2377
2378 def test_dbus_probe_req_reporting_oom(dev, apdev):
2379 """D-Bus Probe Request reporting (OOM)"""
2380 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2381 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2382
2383 # Need to make sure this process has not already subscribed to avoid false
2384 # failures due to the operation succeeding due to os_strdup() not even
2385 # getting called.
2386 try:
2387 iface.UnsubscribeProbeReq()
2388 was_subscribed = True
2389 except dbus.exceptions.DBusException, e:
2390 was_subscribed = False
2391 pass
2392
2393 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_handler_subscribe_preq",
2394 "SubscribeProbeReq"):
2395 iface.SubscribeProbeReq()
2396
2397 if was_subscribed:
2398 # On purpose, leave ProbeReq subscription in place to test automatic
2399 # cleanup.
2400 iface.SubscribeProbeReq()
2401
2402 def test_dbus_p2p_invalid(dev, apdev):
2403 """D-Bus invalid P2P operations"""
2404 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2405 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2406
2407 try:
2408 p2p.RejectPeer(path + "/Peers/00112233445566")
2409 raise Exception("Invalid RejectPeer accepted")
2410 except dbus.exceptions.DBusException, e:
2411 if "UnknownError: Failed to call wpas_p2p_reject" not in str(e):
2412 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e))
2413
2414 try:
2415 p2p.RejectPeer("/foo")
2416 raise Exception("Invalid RejectPeer accepted")
2417 except dbus.exceptions.DBusException, e:
2418 if "InvalidArgs" not in str(e):
2419 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e))
2420
2421 tests = [ { },
2422 { 'peer': 'foo' },
2423 { 'foo': "bar" },
2424 { 'iface': "abc" },
2425 { 'iface': 123 } ]
2426 for t in tests:
2427 try:
2428 p2p.RemoveClient(t)
2429 raise Exception("Invalid RemoveClient accepted")
2430 except dbus.exceptions.DBusException, e:
2431 if "InvalidArgs" not in str(e):
2432 raise Exception("Unexpected error message for invalid RemoveClient(): " + str(e))
2433
2434 tests = [ {'DiscoveryType': 'foo'},
2435 {'RequestedDeviceTypes': 'foo'},
2436 {'RequestedDeviceTypes': ['foo']},
2437 {'RequestedDeviceTypes': ['1','2','3','4','5','6','7','8','9',
2438 '10','11','12','13','14','15','16',
2439 '17']},
2440 {'RequestedDeviceTypes': dbus.Array([], signature="s")},
2441 {'RequestedDeviceTypes': dbus.Array([['foo']], signature="as")},
2442 {'RequestedDeviceTypes': dbus.Array([], signature="i")},
2443 {'RequestedDeviceTypes': [dbus.ByteArray('12345678'),
2444 dbus.ByteArray('1234567')]},
2445 {'Foo': dbus.Int16(1)},
2446 {'Foo': dbus.UInt16(1)},
2447 {'Foo': dbus.Int64(1)},
2448 {'Foo': dbus.UInt64(1)},
2449 {'Foo': dbus.Double(1.23)},
2450 {'Foo': dbus.Signature('s')},
2451 {'Foo': 'bar'}]
2452 for t in tests:
2453 try:
2454 p2p.Find(dbus.Dictionary(t))
2455 raise Exception("Invalid Find accepted")
2456 except dbus.exceptions.DBusException, e:
2457 if "InvalidArgs" not in str(e):
2458 raise Exception("Unexpected error message for invalid Find(): " + str(e))
2459
2460 for p in [ "/foo",
2461 "/fi/w1/wpa_supplicant1/Interfaces/1234",
2462 "/fi/w1/wpa_supplicant1/Interfaces/1234/Networks/1234" ]:
2463 try:
2464 p2p.RemovePersistentGroup(dbus.ObjectPath(p))
2465 raise Exception("Invalid RemovePersistentGroup accepted")
2466 except dbus.exceptions.DBusException, e:
2467 if "InvalidArgs" not in str(e):
2468 raise Exception("Unexpected error message for invalid RemovePersistentGroup: " + str(e))
2469
2470 try:
2471 dev[0].request("P2P_SET disabled 1")
2472 p2p.Listen(5)
2473 raise Exception("Invalid Listen accepted")
2474 except dbus.exceptions.DBusException, e:
2475 if "UnknownError: Could not start P2P listen" not in str(e):
2476 raise Exception("Unexpected error message for invalid Listen: " + str(e))
2477 finally:
2478 dev[0].request("P2P_SET disabled 0")
2479
2480 test_obj = bus.get_object(WPAS_DBUS_SERVICE, path, introspect=False)
2481 test_p2p = dbus.Interface(test_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2482 try:
2483 test_p2p.Listen("foo")
2484 raise Exception("Invalid Listen accepted")
2485 except dbus.exceptions.DBusException, e:
2486 if "InvalidArgs" not in str(e):
2487 raise Exception("Unexpected error message for invalid Listen: " + str(e))
2488
2489 try:
2490 dev[0].request("P2P_SET disabled 1")
2491 p2p.ExtendedListen(dbus.Dictionary({}))
2492 raise Exception("Invalid ExtendedListen accepted")
2493 except dbus.exceptions.DBusException, e:
2494 if "UnknownError: failed to initiate a p2p_ext_listen" not in str(e):
2495 raise Exception("Unexpected error message for invalid ExtendedListen: " + str(e))
2496 finally:
2497 dev[0].request("P2P_SET disabled 0")
2498
2499 try:
2500 dev[0].request("P2P_SET disabled 1")
2501 args = { 'duration1': 30000, 'interval1': 102400,
2502 'duration2': 20000, 'interval2': 102400 }
2503 p2p.PresenceRequest(args)
2504 raise Exception("Invalid PresenceRequest accepted")
2505 except dbus.exceptions.DBusException, e:
2506 if "UnknownError: Failed to invoke presence request" not in str(e):
2507 raise Exception("Unexpected error message for invalid PresenceRequest: " + str(e))
2508 finally:
2509 dev[0].request("P2P_SET disabled 0")
2510
2511 try:
2512 params = dbus.Dictionary({'frequency': dbus.Int32(-1)})
2513 p2p.GroupAdd(params)
2514 raise Exception("Invalid GroupAdd accepted")
2515 except dbus.exceptions.DBusException, e:
2516 if "InvalidArgs" not in str(e):
2517 raise Exception("Unexpected error message for invalid GroupAdd: " + str(e))
2518
2519 try:
2520 params = dbus.Dictionary({'persistent_group_object':
2521 dbus.ObjectPath(path),
2522 'frequency': 2412})
2523 p2p.GroupAdd(params)
2524 raise Exception("Invalid GroupAdd accepted")
2525 except dbus.exceptions.DBusException, e:
2526 if "InvalidArgs" not in str(e):
2527 raise Exception("Unexpected error message for invalid GroupAdd: " + str(e))
2528
2529 try:
2530 p2p.Disconnect()
2531 raise Exception("Invalid Disconnect accepted")
2532 except dbus.exceptions.DBusException, e:
2533 if "UnknownError: failed to disconnect" not in str(e):
2534 raise Exception("Unexpected error message for invalid Disconnect: " + str(e))
2535
2536 try:
2537 dev[0].request("P2P_SET disabled 1")
2538 p2p.Flush()
2539 raise Exception("Invalid Flush accepted")
2540 except dbus.exceptions.DBusException, e:
2541 if "Error.Failed: P2P is not available for this interface" not in str(e):
2542 raise Exception("Unexpected error message for invalid Flush: " + str(e))
2543 finally:
2544 dev[0].request("P2P_SET disabled 0")
2545
2546 try:
2547 dev[0].request("P2P_SET disabled 1")
2548 args = { 'peer': path,
2549 'join': True,
2550 'wps_method': 'pbc',
2551 'frequency': 2412 }
2552 pin = p2p.Connect(args)
2553 raise Exception("Invalid Connect accepted")
2554 except dbus.exceptions.DBusException, e:
2555 if "Error.Failed: P2P is not available for this interface" not in str(e):
2556 raise Exception("Unexpected error message for invalid Connect: " + str(e))
2557 finally:
2558 dev[0].request("P2P_SET disabled 0")
2559
2560 tests = [ { 'frequency': dbus.Int32(-1) },
2561 { 'wps_method': 'pbc' },
2562 { 'wps_method': 'foo' } ]
2563 for args in tests:
2564 try:
2565 pin = p2p.Connect(args)
2566 raise Exception("Invalid Connect accepted")
2567 except dbus.exceptions.DBusException, e:
2568 if "InvalidArgs" not in str(e):
2569 raise Exception("Unexpected error message for invalid Connect: " + str(e))
2570
2571 try:
2572 dev[0].request("P2P_SET disabled 1")
2573 args = { 'peer': path }
2574 pin = p2p.Invite(args)
2575 raise Exception("Invalid Invite accepted")
2576 except dbus.exceptions.DBusException, e:
2577 if "Error.Failed: P2P is not available for this interface" not in str(e):
2578 raise Exception("Unexpected error message for invalid Invite: " + str(e))
2579 finally:
2580 dev[0].request("P2P_SET disabled 0")
2581
2582 try:
2583 args = { 'foo': 'bar' }
2584 pin = p2p.Invite(args)
2585 raise Exception("Invalid Invite accepted")
2586 except dbus.exceptions.DBusException, e:
2587 if "InvalidArgs" not in str(e):
2588 raise Exception("Unexpected error message for invalid Connect: " + str(e))
2589
2590 tests = [ (path, 'display', "InvalidArgs"),
2591 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2592 'display',
2593 "UnknownError: Failed to send provision discovery request"),
2594 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2595 'keypad',
2596 "UnknownError: Failed to send provision discovery request"),
2597 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2598 'pbc',
2599 "UnknownError: Failed to send provision discovery request"),
2600 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2601 'pushbutton',
2602 "UnknownError: Failed to send provision discovery request"),
2603 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2604 'foo', "InvalidArgs") ]
2605 for (p,method,err) in tests:
2606 try:
2607 p2p.ProvisionDiscoveryRequest(p, method)
2608 raise Exception("Invalid ProvisionDiscoveryRequest accepted")
2609 except dbus.exceptions.DBusException, e:
2610 if err not in str(e):
2611 raise Exception("Unexpected error message for invalid ProvisionDiscoveryRequest: " + str(e))
2612
2613 try:
2614 dev[0].request("P2P_SET disabled 1")
2615 if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Peers",
2616 dbus_interface=dbus.PROPERTIES_IFACE)
2617 raise Exception("Invalid Get(Peers) accepted")
2618 except dbus.exceptions.DBusException, e:
2619 if "Error.Failed: P2P is not available for this interface" not in str(e):
2620 raise Exception("Unexpected error message for invalid Get(Peers): " + str(e))
2621 finally:
2622 dev[0].request("P2P_SET disabled 0")
2623
2624 def test_dbus_p2p_oom(dev, apdev):
2625 """D-Bus P2P operations and OOM"""
2626 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2627 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2628
2629 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_string_array",
2630 "Find", "InvalidArgs"):
2631 p2p.Find(dbus.Dictionary({ 'Foo': [ 'bar' ] }))
2632
2633 with alloc_fail_dbus(dev[0], 2, "_wpa_dbus_dict_entry_get_string_array",
2634 "Find", "InvalidArgs"):
2635 p2p.Find(dbus.Dictionary({ 'Foo': [ 'bar' ] }))
2636
2637 with alloc_fail_dbus(dev[0], 10, "_wpa_dbus_dict_entry_get_string_array",
2638 "Find", "InvalidArgs"):
2639 p2p.Find(dbus.Dictionary({ 'Foo': [ '1','2','3','4','5','6','7','8','9' ] }))
2640
2641 with alloc_fail_dbus(dev[0], 1, ":=_wpa_dbus_dict_entry_get_binarray",
2642 "Find", "InvalidArgs"):
2643 p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123') ] }))
2644
2645 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_byte_array;_wpa_dbus_dict_entry_get_binarray",
2646 "Find", "InvalidArgs"):
2647 p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123') ] }))
2648
2649 with alloc_fail_dbus(dev[0], 2, "=_wpa_dbus_dict_entry_get_binarray",
2650 "Find", "InvalidArgs"):
2651 p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123'),
2652 dbus.ByteArray('123'),
2653 dbus.ByteArray('123'),
2654 dbus.ByteArray('123'),
2655 dbus.ByteArray('123'),
2656 dbus.ByteArray('123'),
2657 dbus.ByteArray('123'),
2658 dbus.ByteArray('123'),
2659 dbus.ByteArray('123'),
2660 dbus.ByteArray('123'),
2661 dbus.ByteArray('123') ] }))
2662
2663 with alloc_fail_dbus(dev[0], 1, "wpabuf_alloc_ext_data;_wpa_dbus_dict_entry_get_binarray",
2664 "Find", "InvalidArgs"):
2665 p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123') ] }))
2666
2667 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_fill_value_from_variant;wpas_dbus_handler_p2p_find",
2668 "Find", "InvalidArgs"):
2669 p2p.Find(dbus.Dictionary({ 'Foo': path }))
2670
2671 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_byte_array",
2672 "AddService", "InvalidArgs"):
2673 args = { 'service_type': 'bonjour',
2674 'response': dbus.ByteArray(500*'b') }
2675 p2p.AddService(args)
2676
2677 with alloc_fail_dbus(dev[0], 2, "_wpa_dbus_dict_entry_get_byte_array",
2678 "AddService", "InvalidArgs"):
2679 p2p.AddService(args)
2680
2681 def test_dbus_p2p_discovery(dev, apdev):
2682 """D-Bus P2P discovery"""
2683 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2684 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2685
2686 addr0 = dev[0].p2p_dev_addr()
2687
2688 dev[1].request("SET sec_device_type 1-0050F204-2")
2689 dev[1].request("VENDOR_ELEM_ADD 1 dd0c0050f2041049000411223344")
2690 dev[1].p2p_listen()
2691 addr1 = dev[1].p2p_dev_addr()
2692 a1 = binascii.unhexlify(addr1.replace(':',''))
2693
2694 wfd_devinfo = "00001c440028"
2695 dev[2].request("SET wifi_display 1")
2696 dev[2].request("WFD_SUBELEM_SET 0 0006" + wfd_devinfo)
2697 wfd = binascii.unhexlify('000006' + wfd_devinfo)
2698 dev[2].p2p_listen()
2699 addr2 = dev[2].p2p_dev_addr()
2700 a2 = binascii.unhexlify(addr2.replace(':',''))
2701
2702 res = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE,
2703 dbus_interface=dbus.PROPERTIES_IFACE)
2704 if 'Peers' not in res:
2705 raise Exception("GetAll result missing Peers")
2706 if len(res['Peers']) != 0:
2707 raise Exception("Unexpected peer(s) in the list")
2708
2709 args = {'DiscoveryType': 'social',
2710 'RequestedDeviceTypes': [dbus.ByteArray('12345678')],
2711 'Timeout': dbus.Int32(1) }
2712 p2p.Find(dbus.Dictionary(args))
2713 p2p.StopFind()
2714
2715 class TestDbusP2p(TestDbus):
2716 def __init__(self, bus):
2717 TestDbus.__init__(self, bus)
2718 self.found = False
2719 self.found2 = False
2720 self.lost = False
2721 self.find_stopped = False
2722
2723 def __enter__(self):
2724 gobject.timeout_add(1, self.run_test)
2725 gobject.timeout_add(15000, self.timeout)
2726 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
2727 "DeviceFound")
2728 self.add_signal(self.deviceLost, WPAS_DBUS_IFACE_P2PDEVICE,
2729 "DeviceLost")
2730 self.add_signal(self.provisionDiscoveryResponseEnterPin,
2731 WPAS_DBUS_IFACE_P2PDEVICE,
2732 "ProvisionDiscoveryResponseEnterPin")
2733 self.add_signal(self.findStopped, WPAS_DBUS_IFACE_P2PDEVICE,
2734 "FindStopped")
2735 self.loop.run()
2736 return self
2737
2738 def deviceFound(self, path):
2739 logger.debug("deviceFound: path=%s" % path)
2740 res = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Peers",
2741 dbus_interface=dbus.PROPERTIES_IFACE)
2742 if len(res) < 1:
2743 raise Exception("Unexpected number of peers")
2744 if path not in res:
2745 raise Exception("Mismatch in peer object path")
2746 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
2747 res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
2748 dbus_interface=dbus.PROPERTIES_IFACE,
2749 byte_arrays=True)
2750 logger.debug("peer properties: " + str(res))
2751
2752 if res['DeviceAddress'] == a1:
2753 if 'SecondaryDeviceTypes' not in res:
2754 raise Exception("Missing SecondaryDeviceTypes")
2755 sec = res['SecondaryDeviceTypes']
2756 if len(sec) < 1:
2757 raise Exception("Secondary device type missing")
2758 if "\x00\x01\x00\x50\xF2\x04\x00\x02" not in sec:
2759 raise Exception("Secondary device type mismatch")
2760
2761 if 'VendorExtension' not in res:
2762 raise Exception("Missing VendorExtension")
2763 vendor = res['VendorExtension']
2764 if len(vendor) < 1:
2765 raise Exception("Vendor extension missing")
2766 if "\x11\x22\x33\x44" not in vendor:
2767 raise Exception("Secondary device type mismatch")
2768
2769 self.found = True
2770 elif res['DeviceAddress'] == a2:
2771 if 'IEs' not in res:
2772 raise Exception("IEs missing")
2773 if res['IEs'] != wfd:
2774 raise Exception("IEs mismatch")
2775 self.found2 = True
2776 else:
2777 raise Exception("Unexpected peer device address")
2778
2779 if self.found and self.found2:
2780 p2p.StopFind()
2781 p2p.RejectPeer(path)
2782 p2p.ProvisionDiscoveryRequest(path, 'display')
2783
2784 def deviceLost(self, path):
2785 logger.debug("deviceLost: path=%s" % path)
2786 self.lost = True
2787 try:
2788 p2p.RejectPeer(path)
2789 raise Exception("Invalid RejectPeer accepted")
2790 except dbus.exceptions.DBusException, e:
2791 if "UnknownError: Failed to call wpas_p2p_reject" not in str(e):
2792 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e))
2793 self.loop.quit()
2794
2795 def provisionDiscoveryResponseEnterPin(self, peer_object):
2796 logger.debug("provisionDiscoveryResponseEnterPin - peer=%s" % peer_object)
2797 p2p.Flush()
2798
2799 def findStopped(self):
2800 logger.debug("findStopped")
2801 self.find_stopped = True
2802
2803 def run_test(self, *args):
2804 logger.debug("run_test")
2805 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social',
2806 'Timeout': dbus.Int32(10)}))
2807 return False
2808
2809 def success(self):
2810 return self.found and self.lost and self.found2 and self.find_stopped
2811
2812 with TestDbusP2p(bus) as t:
2813 if not t.success():
2814 raise Exception("Expected signals not seen")
2815
2816 dev[1].request("VENDOR_ELEM_REMOVE 1 *")
2817 dev[1].p2p_stop_find()
2818
2819 p2p.Listen(1)
2820 dev[2].p2p_stop_find()
2821 dev[2].request("P2P_FLUSH")
2822 if not dev[2].discover_peer(addr0):
2823 raise Exception("Peer not found")
2824 p2p.StopFind()
2825 dev[2].p2p_stop_find()
2826
2827 try:
2828 p2p.ExtendedListen(dbus.Dictionary({'foo': 100}))
2829 raise Exception("Invalid ExtendedListen accepted")
2830 except dbus.exceptions.DBusException, e:
2831 if "InvalidArgs" not in str(e):
2832 raise Exception("Unexpected error message for invalid ExtendedListen(): " + str(e))
2833
2834 p2p.ExtendedListen(dbus.Dictionary({'period': 100, 'interval': 1000}))
2835 p2p.ExtendedListen(dbus.Dictionary({}))
2836 dev[0].global_request("P2P_EXT_LISTEN")
2837
2838 def test_dbus_p2p_service_discovery(dev, apdev):
2839 """D-Bus P2P service discovery"""
2840 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2841 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2842
2843 addr0 = dev[0].p2p_dev_addr()
2844 addr1 = dev[1].p2p_dev_addr()
2845
2846 bonjour_query = dbus.ByteArray(binascii.unhexlify('0b5f6166706f766572746370c00c000c01'))
2847 bonjour_response = dbus.ByteArray(binascii.unhexlify('074578616d706c65c027'))
2848
2849 args = { 'service_type': 'bonjour',
2850 'query': bonjour_query,
2851 'response': bonjour_response }
2852 p2p.AddService(args)
2853 p2p.FlushService()
2854 p2p.AddService(args)
2855
2856 try:
2857 p2p.DeleteService(args)
2858 raise Exception("Invalid DeleteService() accepted")
2859 except dbus.exceptions.DBusException, e:
2860 if "InvalidArgs" not in str(e):
2861 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
2862
2863 args = { 'service_type': 'bonjour',
2864 'query': bonjour_query }
2865 p2p.DeleteService(args)
2866 try:
2867 p2p.DeleteService(args)
2868 raise Exception("Invalid DeleteService() accepted")
2869 except dbus.exceptions.DBusException, e:
2870 if "InvalidArgs" not in str(e):
2871 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
2872
2873 args = { 'service_type': 'upnp',
2874 'version': 0x10,
2875 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' }
2876 p2p.AddService(args)
2877 p2p.DeleteService(args)
2878 try:
2879 p2p.DeleteService(args)
2880 raise Exception("Invalid DeleteService() accepted")
2881 except dbus.exceptions.DBusException, e:
2882 if "InvalidArgs" not in str(e):
2883 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
2884
2885 tests = [ { 'service_type': 'foo' },
2886 { 'service_type': 'foo', 'query': bonjour_query },
2887 { 'service_type': 'upnp' },
2888 { 'service_type': 'upnp', 'version': 0x10 },
2889 { 'service_type': 'upnp',
2890 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
2891 { 'version': 0x10,
2892 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
2893 { 'service_type': 'upnp', 'foo': 'bar' },
2894 { 'service_type': 'bonjour' },
2895 { 'service_type': 'bonjour', 'query': 'foo' },
2896 { 'service_type': 'bonjour', 'foo': 'bar' } ]
2897 for args in tests:
2898 try:
2899 p2p.DeleteService(args)
2900 raise Exception("Invalid DeleteService() accepted")
2901 except dbus.exceptions.DBusException, e:
2902 if "InvalidArgs" not in str(e):
2903 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
2904
2905 tests = [ { 'service_type': 'foo' },
2906 { 'service_type': 'upnp' },
2907 { 'service_type': 'upnp', 'version': 0x10 },
2908 { 'service_type': 'upnp',
2909 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
2910 { 'version': 0x10,
2911 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
2912 { 'service_type': 'upnp', 'foo': 'bar' },
2913 { 'service_type': 'bonjour' },
2914 { 'service_type': 'bonjour', 'query': 'foo' },
2915 { 'service_type': 'bonjour', 'response': 'foo' },
2916 { 'service_type': 'bonjour', 'query': bonjour_query },
2917 { 'service_type': 'bonjour', 'response': bonjour_response },
2918 { 'service_type': 'bonjour', 'query': dbus.ByteArray(500*'a') },
2919 { 'service_type': 'bonjour', 'foo': 'bar' } ]
2920 for args in tests:
2921 try:
2922 p2p.AddService(args)
2923 raise Exception("Invalid AddService() accepted")
2924 except dbus.exceptions.DBusException, e:
2925 if "InvalidArgs" not in str(e):
2926 raise Exception("Unexpected error message for invalid AddService(): " + str(e))
2927
2928 args = { 'tlv': dbus.ByteArray("\x02\x00\x00\x01") }
2929 ref = p2p.ServiceDiscoveryRequest(args)
2930 p2p.ServiceDiscoveryCancelRequest(ref)
2931 try:
2932 p2p.ServiceDiscoveryCancelRequest(ref)
2933 raise Exception("Invalid ServiceDiscoveryCancelRequest() accepted")
2934 except dbus.exceptions.DBusException, e:
2935 if "InvalidArgs" not in str(e):
2936 raise Exception("Unexpected error message for invalid AddService(): " + str(e))
2937 try:
2938 p2p.ServiceDiscoveryCancelRequest(dbus.UInt64(0))
2939 raise Exception("Invalid ServiceDiscoveryCancelRequest() accepted")
2940 except dbus.exceptions.DBusException, e:
2941 if "InvalidArgs" not in str(e):
2942 raise Exception("Unexpected error message for invalid AddService(): " + str(e))
2943
2944 args = { 'service_type': 'upnp',
2945 'version': 0x10,
2946 'service': 'ssdp:foo' }
2947 ref = p2p.ServiceDiscoveryRequest(args)
2948 p2p.ServiceDiscoveryCancelRequest(ref)
2949
2950 tests = [ { 'service_type': 'foo' },
2951 { 'foo': 'bar' },
2952 { 'tlv': 'foo' },
2953 { },
2954 { 'version': 0 },
2955 { 'service_type': 'upnp',
2956 'service': 'ssdp:foo' },
2957 { 'service_type': 'upnp',
2958 'version': 0x10 },
2959 { 'service_type': 'upnp',
2960 'version': 0x10,
2961 'service': 'ssdp:foo',
2962 'peer_object': dbus.ObjectPath(path + "/Peers") },
2963 { 'service_type': 'upnp',
2964 'version': 0x10,
2965 'service': 'ssdp:foo',
2966 'peer_object': path + "/Peers" },
2967 { 'service_type': 'upnp',
2968 'version': 0x10,
2969 'service': 'ssdp:foo',
2970 'peer_object': dbus.ObjectPath(path + "/Peers/00112233445566") } ]
2971 for args in tests:
2972 try:
2973 p2p.ServiceDiscoveryRequest(args)
2974 raise Exception("Invalid ServiceDiscoveryRequest accepted")
2975 except dbus.exceptions.DBusException, e:
2976 if "InvalidArgs" not in str(e):
2977 raise Exception("Unexpected error message for invalid ServiceDiscoveryRequest(): " + str(e))
2978
2979 args = { 'foo': 'bar' }
2980 try:
2981 p2p.ServiceDiscoveryResponse(dbus.Dictionary(args, signature='sv'))
2982 raise Exception("Invalid ServiceDiscoveryResponse accepted")
2983 except dbus.exceptions.DBusException, e:
2984 if "InvalidArgs" not in str(e):
2985 raise Exception("Unexpected error message for invalid ServiceDiscoveryResponse(): " + str(e))
2986
2987 def test_dbus_p2p_service_discovery_query(dev, apdev):
2988 """D-Bus P2P service discovery query"""
2989 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2990 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2991
2992 addr0 = dev[0].p2p_dev_addr()
2993 dev[1].request("P2P_SERVICE_ADD bonjour 0b5f6166706f766572746370c00c000c01 074578616d706c65c027")
2994 dev[1].p2p_listen()
2995 addr1 = dev[1].p2p_dev_addr()
2996
2997 class TestDbusP2p(TestDbus):
2998 def __init__(self, bus):
2999 TestDbus.__init__(self, bus)
3000 self.done = False
3001
3002 def __enter__(self):
3003 gobject.timeout_add(1, self.run_test)
3004 gobject.timeout_add(15000, self.timeout)
3005 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3006 "DeviceFound")
3007 self.add_signal(self.serviceDiscoveryResponse,
3008 WPAS_DBUS_IFACE_P2PDEVICE,
3009 "ServiceDiscoveryResponse", byte_arrays=True)
3010 self.loop.run()
3011 return self
3012
3013 def deviceFound(self, path):
3014 logger.debug("deviceFound: path=%s" % path)
3015 args = { 'peer_object': path,
3016 'tlv': dbus.ByteArray("\x02\x00\x00\x01") }
3017 p2p.ServiceDiscoveryRequest(args)
3018
3019 def serviceDiscoveryResponse(self, sd_request):
3020 logger.debug("serviceDiscoveryResponse: sd_request=%s" % str(sd_request))
3021 self.done = True
3022 self.loop.quit()
3023
3024 def run_test(self, *args):
3025 logger.debug("run_test")
3026 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social',
3027 'Timeout': dbus.Int32(10)}))
3028 return False
3029
3030 def success(self):
3031 return self.done
3032
3033 with TestDbusP2p(bus) as t:
3034 if not t.success():
3035 raise Exception("Expected signals not seen")
3036
3037 dev[1].p2p_stop_find()
3038
3039 def test_dbus_p2p_service_discovery_external(dev, apdev):
3040 """D-Bus P2P service discovery with external response"""
3041 try:
3042 _test_dbus_p2p_service_discovery_external(dev, apdev)
3043 finally:
3044 dev[0].request("P2P_SERV_DISC_EXTERNAL 0")
3045
3046 def _test_dbus_p2p_service_discovery_external(dev, apdev):
3047 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3048 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3049
3050 addr0 = dev[0].p2p_dev_addr()
3051 addr1 = dev[1].p2p_dev_addr()
3052 resp = "0300000101"
3053
3054 dev[1].request("P2P_FLUSH")
3055 dev[1].request("P2P_SERV_DISC_REQ " + addr0 + " 02000001")
3056 dev[1].p2p_find(social=True)
3057
3058 class TestDbusP2p(TestDbus):
3059 def __init__(self, bus):
3060 TestDbus.__init__(self, bus)
3061 self.sd = False
3062
3063 def __enter__(self):
3064 gobject.timeout_add(1, self.run_test)
3065 gobject.timeout_add(15000, self.timeout)
3066 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3067 "DeviceFound")
3068 self.add_signal(self.serviceDiscoveryRequest,
3069 WPAS_DBUS_IFACE_P2PDEVICE,
3070 "ServiceDiscoveryRequest")
3071 self.loop.run()
3072 return self
3073
3074 def deviceFound(self, path):
3075 logger.debug("deviceFound: path=%s" % path)
3076
3077 def serviceDiscoveryRequest(self, sd_request):
3078 logger.debug("serviceDiscoveryRequest: sd_request=%s" % str(sd_request))
3079 self.sd = True
3080 args = { 'peer_object': sd_request['peer_object'],
3081 'frequency': sd_request['frequency'],
3082 'dialog_token': sd_request['dialog_token'],
3083 'tlvs': dbus.ByteArray(binascii.unhexlify(resp)) }
3084 p2p.ServiceDiscoveryResponse(dbus.Dictionary(args, signature='sv'))
3085 self.loop.quit()
3086
3087 def run_test(self, *args):
3088 logger.debug("run_test")
3089 p2p.ServiceDiscoveryExternal(1)
3090 p2p.ServiceUpdate()
3091 p2p.Listen(15)
3092 return False
3093
3094 def success(self):
3095 return self.sd
3096
3097 with TestDbusP2p(bus) as t:
3098 if not t.success():
3099 raise Exception("Expected signals not seen")
3100
3101 ev = dev[1].wait_global_event(["P2P-SERV-DISC-RESP"], timeout=5)
3102 if ev is None:
3103 raise Exception("Service discovery timed out")
3104 if addr0 not in ev:
3105 raise Exception("Unexpected address in SD Response: " + ev)
3106 if ev.split(' ')[4] != resp:
3107 raise Exception("Unexpected response data SD Response: " + ev)
3108 dev[1].p2p_stop_find()
3109
3110 p2p.StopFind()
3111 p2p.ServiceDiscoveryExternal(0)
3112
3113 def test_dbus_p2p_autogo(dev, apdev):
3114 """D-Bus P2P autonomous GO"""
3115 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3116 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3117
3118 addr0 = dev[0].p2p_dev_addr()
3119
3120 class TestDbusP2p(TestDbus):
3121 def __init__(self, bus):
3122 TestDbus.__init__(self, bus)
3123 self.first = True
3124 self.waiting_end = False
3125 self.exceptions = False
3126 self.deauthorized = False
3127 self.done = False
3128
3129 def __enter__(self):
3130 gobject.timeout_add(1, self.run_test)
3131 gobject.timeout_add(15000, self.timeout)
3132 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3133 "DeviceFound")
3134 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3135 "GroupStarted")
3136 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3137 "GroupFinished")
3138 self.add_signal(self.persistentGroupAdded,
3139 WPAS_DBUS_IFACE_P2PDEVICE,
3140 "PersistentGroupAdded")
3141 self.add_signal(self.persistentGroupRemoved,
3142 WPAS_DBUS_IFACE_P2PDEVICE,
3143 "PersistentGroupRemoved")
3144 self.add_signal(self.provisionDiscoveryRequestDisplayPin,
3145 WPAS_DBUS_IFACE_P2PDEVICE,
3146 "ProvisionDiscoveryRequestDisplayPin")
3147 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
3148 "StaAuthorized")
3149 self.add_signal(self.staDeauthorized, WPAS_DBUS_IFACE,
3150 "StaDeauthorized")
3151 self.loop.run()
3152 return self
3153
3154 def groupStarted(self, properties):
3155 logger.debug("groupStarted: " + str(properties))
3156 self.group = properties['group_object']
3157 self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3158 properties['interface_object'])
3159 role = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Role",
3160 dbus_interface=dbus.PROPERTIES_IFACE)
3161 if role != "GO":
3162 self.exceptions = True
3163 raise Exception("Unexpected role reported: " + role)
3164 group = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Group",
3165 dbus_interface=dbus.PROPERTIES_IFACE)
3166 if group != properties['group_object']:
3167 self.exceptions = True
3168 raise Exception("Unexpected Group reported: " + str(group))
3169 go = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PeerGO",
3170 dbus_interface=dbus.PROPERTIES_IFACE)
3171 if go != '/':
3172 self.exceptions = True
3173 raise Exception("Unexpected PeerGO value: " + str(go))
3174 if self.first:
3175 self.first = False
3176 logger.info("Remove persistent group instance")
3177 group_p2p = dbus.Interface(self.g_if_obj,
3178 WPAS_DBUS_IFACE_P2PDEVICE)
3179 group_p2p.Disconnect()
3180 else:
3181 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3182 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 join")
3183
3184 def groupFinished(self, properties):
3185 logger.debug("groupFinished: " + str(properties))
3186 if self.waiting_end:
3187 logger.info("Remove persistent group")
3188 p2p.RemovePersistentGroup(self.persistent)
3189 else:
3190 logger.info("Re-start persistent group")
3191 params = dbus.Dictionary({'persistent_group_object':
3192 self.persistent,
3193 'frequency': 2412})
3194 p2p.GroupAdd(params)
3195
3196 def persistentGroupAdded(self, path, properties):
3197 logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
3198 self.persistent = path
3199
3200 def persistentGroupRemoved(self, path):
3201 logger.debug("persistentGroupRemoved: %s" % path)
3202 self.done = True
3203 self.loop.quit()
3204
3205 def deviceFound(self, path):
3206 logger.debug("deviceFound: path=%s" % path)
3207 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3208 self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3209 dbus_interface=dbus.PROPERTIES_IFACE,
3210 byte_arrays=True)
3211 logger.debug('peer properties: ' + str(self.peer))
3212
3213 def provisionDiscoveryRequestDisplayPin(self, peer_object, pin):
3214 logger.debug("provisionDiscoveryRequestDisplayPin - peer=%s pin=%s" % (peer_object, pin))
3215 self.peer_path = peer_object
3216 peer = binascii.unhexlify(peer_object.split('/')[-1])
3217 addr = ""
3218 for p in peer:
3219 if len(addr) > 0:
3220 addr += ':'
3221 addr += '%02x' % ord(p)
3222
3223 params = { 'Role': 'registrar',
3224 'P2PDeviceAddress': self.peer['DeviceAddress'],
3225 'Bssid': self.peer['DeviceAddress'],
3226 'Type': 'pin' }
3227 wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS)
3228 try:
3229 wps.Start(params)
3230 self.exceptions = True
3231 raise Exception("Invalid WPS.Start() accepted")
3232 except dbus.exceptions.DBusException, e:
3233 if "InvalidArgs" not in str(e):
3234 self.exceptions = True
3235 raise Exception("Unexpected error message: " + str(e))
3236 params = { 'Role': 'registrar',
3237 'P2PDeviceAddress': self.peer['DeviceAddress'],
3238 'Type': 'pin',
3239 'Pin': '12345670' }
3240 logger.info("Authorize peer to connect to the group")
3241 wps.Start(params)
3242
3243 def staAuthorized(self, name):
3244 logger.debug("staAuthorized: " + name)
3245 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, self.peer_path)
3246 res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3247 dbus_interface=dbus.PROPERTIES_IFACE,
3248 byte_arrays=True)
3249 logger.debug("Peer properties: " + str(res))
3250 if 'Groups' not in res or len(res['Groups']) != 1:
3251 self.exceptions = True
3252 raise Exception("Unexpected number of peer Groups entries")
3253 if res['Groups'][0] != self.group:
3254 self.exceptions = True
3255 raise Exception("Unexpected peer Groups[0] value")
3256
3257 g_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group)
3258 res = g_obj.GetAll(WPAS_DBUS_GROUP,
3259 dbus_interface=dbus.PROPERTIES_IFACE,
3260 byte_arrays=True)
3261 logger.debug("Group properties: " + str(res))
3262 if 'Members' not in res or len(res['Members']) != 1:
3263 self.exceptions = True
3264 raise Exception("Unexpected number of group members")
3265
3266 ext = dbus.ByteArray("\x11\x22\x33\x44")
3267 # Earlier implementation of this interface was a bit strange. The
3268 # property is defined to have aay signature and that is what the
3269 # getter returned. However, the setter expected there to be a
3270 # dictionary with 'WPSVendorExtensions' as the key surrounding these
3271 # values.. The current implementations maintains support for that
3272 # for backwards compability reasons. Verify that encoding first.
3273 vals = dbus.Dictionary({ 'WPSVendorExtensions': [ ext ]},
3274 signature='sv')
3275 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3276 dbus_interface=dbus.PROPERTIES_IFACE)
3277 res = g_obj.Get(WPAS_DBUS_GROUP, 'WPSVendorExtensions',
3278 dbus_interface=dbus.PROPERTIES_IFACE,
3279 byte_arrays=True)
3280 if len(res) != 1:
3281 self.exceptions = True
3282 raise Exception("Unexpected number of vendor extensions")
3283 if res[0] != ext:
3284 self.exceptions = True
3285 raise Exception("Vendor extension value changed")
3286
3287 # And now verify that the more appropriate encoding is accepted as
3288 # well.
3289 res.append(dbus.ByteArray('\xaa\xbb\xcc\xdd\xee\xff'))
3290 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
3291 dbus_interface=dbus.PROPERTIES_IFACE)
3292 res2 = g_obj.Get(WPAS_DBUS_GROUP, 'WPSVendorExtensions',
3293 dbus_interface=dbus.PROPERTIES_IFACE,
3294 byte_arrays=True)
3295 if len(res) != 2:
3296 self.exceptions = True
3297 raise Exception("Unexpected number of vendor extensions")
3298 if res[0] != res2[0] or res[1] != res2[1]:
3299 self.exceptions = True
3300 raise Exception("Vendor extension value changed")
3301
3302 for i in range(10):
3303 res.append(dbus.ByteArray('\xaa\xbb'))
3304 try:
3305 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
3306 dbus_interface=dbus.PROPERTIES_IFACE)
3307 self.exceptions = True
3308 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3309 except dbus.exceptions.DBusException, e:
3310 if "Error.Failed" not in str(e):
3311 self.exceptions = True
3312 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3313
3314 vals = dbus.Dictionary({ 'Foo': [ ext ]}, signature='sv')
3315 try:
3316 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3317 dbus_interface=dbus.PROPERTIES_IFACE)
3318 self.exceptions = True
3319 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3320 except dbus.exceptions.DBusException, e:
3321 if "InvalidArgs" not in str(e):
3322 self.exceptions = True
3323 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3324
3325 vals = [ "foo" ]
3326 try:
3327 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3328 dbus_interface=dbus.PROPERTIES_IFACE)
3329 self.exceptions = True
3330 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3331 except dbus.exceptions.DBusException, e:
3332 if "Error.Failed" not in str(e):
3333 self.exceptions = True
3334 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3335
3336 vals = [ [ "foo" ] ]
3337 try:
3338 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3339 dbus_interface=dbus.PROPERTIES_IFACE)
3340 self.exceptions = True
3341 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3342 except dbus.exceptions.DBusException, e:
3343 if "Error.Failed" not in str(e):
3344 self.exceptions = True
3345 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3346
3347 p2p.RemoveClient({ 'peer': self.peer_path })
3348
3349 self.waiting_end = True
3350 group_p2p = dbus.Interface(self.g_if_obj,
3351 WPAS_DBUS_IFACE_P2PDEVICE)
3352 group_p2p.Disconnect()
3353
3354 def staDeauthorized(self, name):
3355 logger.debug("staDeauthorized: " + name)
3356 self.deauthorized = True
3357
3358 def run_test(self, *args):
3359 logger.debug("run_test")
3360 params = dbus.Dictionary({'persistent': True,
3361 'frequency': 2412})
3362 logger.info("Add a persistent group")
3363 p2p.GroupAdd(params)
3364 return False
3365
3366 def success(self):
3367 return self.done and self.deauthorized and not self.exceptions
3368
3369 with TestDbusP2p(bus) as t:
3370 if not t.success():
3371 raise Exception("Expected signals not seen")
3372
3373 dev[1].wait_go_ending_session()
3374
3375 def test_dbus_p2p_autogo_pbc(dev, apdev):
3376 """D-Bus P2P autonomous GO and PBC"""
3377 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3378 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3379
3380 addr0 = dev[0].p2p_dev_addr()
3381
3382 class TestDbusP2p(TestDbus):
3383 def __init__(self, bus):
3384 TestDbus.__init__(self, bus)
3385 self.first = True
3386 self.waiting_end = False
3387 self.done = False
3388
3389 def __enter__(self):
3390 gobject.timeout_add(1, self.run_test)
3391 gobject.timeout_add(15000, self.timeout)
3392 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3393 "DeviceFound")
3394 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3395 "GroupStarted")
3396 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3397 "GroupFinished")
3398 self.add_signal(self.provisionDiscoveryPBCRequest,
3399 WPAS_DBUS_IFACE_P2PDEVICE,
3400 "ProvisionDiscoveryPBCRequest")
3401 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
3402 "StaAuthorized")
3403 self.loop.run()
3404 return self
3405
3406 def groupStarted(self, properties):
3407 logger.debug("groupStarted: " + str(properties))
3408 self.group = properties['group_object']
3409 self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3410 properties['interface_object'])
3411 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3412 dev1.global_request("P2P_CONNECT " + addr0 + " pbc join")
3413
3414 def groupFinished(self, properties):
3415 logger.debug("groupFinished: " + str(properties))
3416 self.done = True
3417 self.loop.quit()
3418
3419 def deviceFound(self, path):
3420 logger.debug("deviceFound: path=%s" % path)
3421 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3422 self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3423 dbus_interface=dbus.PROPERTIES_IFACE,
3424 byte_arrays=True)
3425 logger.debug('peer properties: ' + str(self.peer))
3426
3427 def provisionDiscoveryPBCRequest(self, peer_object):
3428 logger.debug("provisionDiscoveryPBCRequest - peer=%s" % peer_object)
3429 self.peer_path = peer_object
3430 peer = binascii.unhexlify(peer_object.split('/')[-1])
3431 addr = ""
3432 for p in peer:
3433 if len(addr) > 0:
3434 addr += ':'
3435 addr += '%02x' % ord(p)
3436 params = { 'Role': 'registrar',
3437 'P2PDeviceAddress': self.peer['DeviceAddress'],
3438 'Type': 'pbc' }
3439 logger.info("Authorize peer to connect to the group")
3440 wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS)
3441 wps.Start(params)
3442
3443 def staAuthorized(self, name):
3444 logger.debug("staAuthorized: " + name)
3445 group_p2p = dbus.Interface(self.g_if_obj,
3446 WPAS_DBUS_IFACE_P2PDEVICE)
3447 group_p2p.Disconnect()
3448
3449 def run_test(self, *args):
3450 logger.debug("run_test")
3451 params = dbus.Dictionary({'frequency': 2412})
3452 p2p.GroupAdd(params)
3453 return False
3454
3455 def success(self):
3456 return self.done
3457
3458 with TestDbusP2p(bus) as t:
3459 if not t.success():
3460 raise Exception("Expected signals not seen")
3461
3462 dev[1].wait_go_ending_session()
3463 dev[1].flush_scan_cache()
3464
3465 def test_dbus_p2p_autogo_legacy(dev, apdev):
3466 """D-Bus P2P autonomous GO and legacy STA"""
3467 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3468 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3469
3470 addr0 = dev[0].p2p_dev_addr()
3471
3472 class TestDbusP2p(TestDbus):
3473 def __init__(self, bus):
3474 TestDbus.__init__(self, bus)
3475 self.done = False
3476
3477 def __enter__(self):
3478 gobject.timeout_add(1, self.run_test)
3479 gobject.timeout_add(15000, self.timeout)
3480 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3481 "GroupStarted")
3482 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3483 "GroupFinished")
3484 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
3485 "StaAuthorized")
3486 self.loop.run()
3487 return self
3488
3489 def groupStarted(self, properties):
3490 logger.debug("groupStarted: " + str(properties))
3491 g_obj = bus.get_object(WPAS_DBUS_SERVICE,
3492 properties['group_object'])
3493 res = g_obj.GetAll(WPAS_DBUS_GROUP,
3494 dbus_interface=dbus.PROPERTIES_IFACE,
3495 byte_arrays=True)
3496 bssid = ':'.join([binascii.hexlify(l) for l in res['BSSID']])
3497
3498 pin = '12345670'
3499 params = { 'Role': 'enrollee',
3500 'Type': 'pin',
3501 'Pin': pin }
3502 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3503 properties['interface_object'])
3504 wps = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_WPS)
3505 wps.Start(params)
3506 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3507 dev1.scan_for_bss(bssid, freq=2412)
3508 dev1.request("WPS_PIN " + bssid + " " + pin)
3509 self.group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3510
3511 def groupFinished(self, properties):
3512 logger.debug("groupFinished: " + str(properties))
3513 self.done = True
3514 self.loop.quit()
3515
3516 def staAuthorized(self, name):
3517 logger.debug("staAuthorized: " + name)
3518 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3519 dev1.request("DISCONNECT")
3520 self.group_p2p.Disconnect()
3521
3522 def run_test(self, *args):
3523 logger.debug("run_test")
3524 params = dbus.Dictionary({'frequency': 2412})
3525 p2p.GroupAdd(params)
3526 return False
3527
3528 def success(self):
3529 return self.done
3530
3531 with TestDbusP2p(bus) as t:
3532 if not t.success():
3533 raise Exception("Expected signals not seen")
3534
3535 def test_dbus_p2p_join(dev, apdev):
3536 """D-Bus P2P join an autonomous GO"""
3537 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3538 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3539
3540 addr1 = dev[1].p2p_dev_addr()
3541 addr2 = dev[2].p2p_dev_addr()
3542 dev[1].p2p_start_go(freq=2412)
3543 dev1_group_ifname = dev[1].group_ifname
3544 dev[2].p2p_listen()
3545
3546 class TestDbusP2p(TestDbus):
3547 def __init__(self, bus):
3548 TestDbus.__init__(self, bus)
3549 self.done = False
3550 self.peer = None
3551 self.go = None
3552
3553 def __enter__(self):
3554 gobject.timeout_add(1, self.run_test)
3555 gobject.timeout_add(15000, self.timeout)
3556 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3557 "DeviceFound")
3558 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3559 "GroupStarted")
3560 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3561 "GroupFinished")
3562 self.add_signal(self.invitationResult, WPAS_DBUS_IFACE_P2PDEVICE,
3563 "InvitationResult")
3564 self.loop.run()
3565 return self
3566
3567 def deviceFound(self, path):
3568 logger.debug("deviceFound: path=%s" % path)
3569 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3570 res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3571 dbus_interface=dbus.PROPERTIES_IFACE,
3572 byte_arrays=True)
3573 logger.debug('peer properties: ' + str(res))
3574 if addr2.replace(':','') in path:
3575 self.peer = path
3576 elif addr1.replace(':','') in path:
3577 self.go = path
3578 if self.peer and self.go:
3579 logger.info("Join the group")
3580 p2p.StopFind()
3581 args = { 'peer': self.go,
3582 'join': True,
3583 'wps_method': 'pin',
3584 'frequency': 2412 }
3585 pin = p2p.Connect(args)
3586
3587 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3588 dev1.group_ifname = dev1_group_ifname
3589 dev1.group_request("WPS_PIN any " + pin)
3590
3591 def groupStarted(self, properties):
3592 logger.debug("groupStarted: " + str(properties))
3593 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3594 properties['interface_object'])
3595 role = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Role",
3596 dbus_interface=dbus.PROPERTIES_IFACE)
3597 if role != "client":
3598 raise Exception("Unexpected role reported: " + role)
3599 group = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Group",
3600 dbus_interface=dbus.PROPERTIES_IFACE)
3601 if group != properties['group_object']:
3602 raise Exception("Unexpected Group reported: " + str(group))
3603 go = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PeerGO",
3604 dbus_interface=dbus.PROPERTIES_IFACE)
3605 if go != self.go:
3606 raise Exception("Unexpected PeerGO value: " + str(go))
3607
3608 g_obj = bus.get_object(WPAS_DBUS_SERVICE,
3609 properties['group_object'])
3610 res = g_obj.GetAll(WPAS_DBUS_GROUP,
3611 dbus_interface=dbus.PROPERTIES_IFACE,
3612 byte_arrays=True)
3613 logger.debug("Group properties: " + str(res))
3614
3615 ext = dbus.ByteArray("\x11\x22\x33\x44")
3616 try:
3617 # Set(WPSVendorExtensions) not allowed for P2P Client
3618 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
3619 dbus_interface=dbus.PROPERTIES_IFACE)
3620 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3621 except dbus.exceptions.DBusException, e:
3622 if "Error.Failed: Failed to set property" not in str(e):
3623 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3624
3625 group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3626 args = { 'duration1': 30000, 'interval1': 102400,
3627 'duration2': 20000, 'interval2': 102400 }
3628 group_p2p.PresenceRequest(args)
3629
3630 args = { 'peer': self.peer }
3631 group_p2p.Invite(args)
3632
3633 def groupFinished(self, properties):
3634 logger.debug("groupFinished: " + str(properties))
3635 self.done = True
3636 self.loop.quit()
3637
3638 def invitationResult(self, result):
3639 logger.debug("invitationResult: " + str(result))
3640 if result['status'] != 1:
3641 raise Exception("Unexpected invitation result: " + str(result))
3642 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3643 dev1.group_ifname = dev1_group_ifname
3644 dev1.remove_group()
3645
3646 def run_test(self, *args):
3647 logger.debug("run_test")
3648 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
3649 return False
3650
3651 def success(self):
3652 return self.done
3653
3654 with TestDbusP2p(bus) as t:
3655 if not t.success():
3656 raise Exception("Expected signals not seen")
3657
3658 dev[2].p2p_stop_find()
3659
3660 def test_dbus_p2p_invitation_received(dev, apdev):
3661 """D-Bus P2P and InvitationReceived"""
3662 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3663 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3664
3665 form(dev[0], dev[1])
3666 addr0 = dev[0].p2p_dev_addr()
3667 dev[0].p2p_listen()
3668 dev[0].global_request("SET persistent_reconnect 0")
3669
3670 if not dev[1].discover_peer(addr0, social=True):
3671 raise Exception("Peer " + addr0 + " not found")
3672 peer = dev[1].get_peer(addr0)
3673
3674 class TestDbusP2p(TestDbus):
3675 def __init__(self, bus):
3676 TestDbus.__init__(self, bus)
3677 self.done = False
3678
3679 def __enter__(self):
3680 gobject.timeout_add(1, self.run_test)
3681 gobject.timeout_add(15000, self.timeout)
3682 self.add_signal(self.invitationReceived, WPAS_DBUS_IFACE_P2PDEVICE,
3683 "InvitationReceived")
3684 self.loop.run()
3685 return self
3686
3687 def invitationReceived(self, result):
3688 logger.debug("invitationReceived: " + str(result))
3689 self.done = True
3690 self.loop.quit()
3691
3692 def run_test(self, *args):
3693 logger.debug("run_test")
3694 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3695 cmd = "P2P_INVITE persistent=" + peer['persistent'] + " peer=" + addr0
3696 dev1.global_request(cmd)
3697 return False
3698
3699 def success(self):
3700 return self.done
3701
3702 with TestDbusP2p(bus) as t:
3703 if not t.success():
3704 raise Exception("Expected signals not seen")
3705
3706 dev[0].p2p_stop_find()
3707 dev[1].p2p_stop_find()
3708
3709 def test_dbus_p2p_config(dev, apdev):
3710 """D-Bus Get/Set P2PDeviceConfig"""
3711 try:
3712 _test_dbus_p2p_config(dev, apdev)
3713 finally:
3714 dev[0].request("P2P_SET ssid_postfix ")
3715
3716 def _test_dbus_p2p_config(dev, apdev):
3717 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3718 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3719
3720 res = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3721 dbus_interface=dbus.PROPERTIES_IFACE,
3722 byte_arrays=True)
3723 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig", res,
3724 dbus_interface=dbus.PROPERTIES_IFACE)
3725 res2 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3726 dbus_interface=dbus.PROPERTIES_IFACE,
3727 byte_arrays=True)
3728
3729 if len(res) != len(res2):
3730 raise Exception("Different number of parameters")
3731 for k in res:
3732 if res[k] != res2[k]:
3733 raise Exception("Parameter %s value changes" % k)
3734
3735 changes = { 'SsidPostfix': 'foo',
3736 'VendorExtension': [ dbus.ByteArray('\x11\x22\x33\x44') ],
3737 'SecondaryDeviceTypes': [ dbus.ByteArray('\x11\x22\x33\x44\x55\x66\x77\x88') ]}
3738 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3739 dbus.Dictionary(changes, signature='sv'),
3740 dbus_interface=dbus.PROPERTIES_IFACE)
3741
3742 res2 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3743 dbus_interface=dbus.PROPERTIES_IFACE,
3744 byte_arrays=True)
3745 logger.debug("P2PDeviceConfig: " + str(res2))
3746 if 'VendorExtension' not in res2 or len(res2['VendorExtension']) != 1:
3747 raise Exception("VendorExtension does not match")
3748 if 'SecondaryDeviceTypes' not in res2 or len(res2['SecondaryDeviceTypes']) != 1:
3749 raise Exception("SecondaryDeviceType does not match")
3750
3751 changes = { 'SsidPostfix': '',
3752 'VendorExtension': dbus.Array([], signature="ay"),
3753 'SecondaryDeviceTypes': dbus.Array([], signature="ay") }
3754 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3755 dbus.Dictionary(changes, signature='sv'),
3756 dbus_interface=dbus.PROPERTIES_IFACE)
3757
3758 res3 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3759 dbus_interface=dbus.PROPERTIES_IFACE,
3760 byte_arrays=True)
3761 logger.debug("P2PDeviceConfig: " + str(res3))
3762 if 'VendorExtension' in res3:
3763 raise Exception("VendorExtension not removed")
3764 if 'SecondaryDeviceTypes' in res3:
3765 raise Exception("SecondaryDeviceType not removed")
3766
3767 try:
3768 dev[0].request("P2P_SET disabled 1")
3769 if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3770 dbus_interface=dbus.PROPERTIES_IFACE,
3771 byte_arrays=True)
3772 raise Exception("Invalid Get(P2PDeviceConfig) accepted")
3773 except dbus.exceptions.DBusException, e:
3774 if "Error.Failed: P2P is not available for this interface" not in str(e):
3775 raise Exception("Unexpected error message for invalid Invite: " + str(e))
3776 finally:
3777 dev[0].request("P2P_SET disabled 0")
3778
3779 try:
3780 dev[0].request("P2P_SET disabled 1")
3781 changes = { 'SsidPostfix': 'foo' }
3782 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3783 dbus.Dictionary(changes, signature='sv'),
3784 dbus_interface=dbus.PROPERTIES_IFACE)
3785 raise Exception("Invalid Set(P2PDeviceConfig) accepted")
3786 except dbus.exceptions.DBusException, e:
3787 if "Error.Failed: P2P is not available for this interface" not in str(e):
3788 raise Exception("Unexpected error message for invalid Invite: " + str(e))
3789 finally:
3790 dev[0].request("P2P_SET disabled 0")
3791
3792 tests = [ { 'DeviceName': 123 },
3793 { 'SsidPostfix': 123 },
3794 { 'Foo': 'Bar' } ]
3795 for changes in tests:
3796 try:
3797 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3798 dbus.Dictionary(changes, signature='sv'),
3799 dbus_interface=dbus.PROPERTIES_IFACE)
3800 raise Exception("Invalid Set(P2PDeviceConfig) accepted")
3801 except dbus.exceptions.DBusException, e:
3802 if "InvalidArgs" not in str(e):
3803 raise Exception("Unexpected error message for invalid Invite: " + str(e))
3804
3805 def test_dbus_p2p_persistent(dev, apdev):
3806 """D-Bus P2P persistent group"""
3807 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3808 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3809
3810 class TestDbusP2p(TestDbus):
3811 def __init__(self, bus):
3812 TestDbus.__init__(self, bus)
3813
3814 def __enter__(self):
3815 gobject.timeout_add(1, self.run_test)
3816 gobject.timeout_add(15000, self.timeout)
3817 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3818 "GroupStarted")
3819 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3820 "GroupFinished")
3821 self.add_signal(self.persistentGroupAdded,
3822 WPAS_DBUS_IFACE_P2PDEVICE,
3823 "PersistentGroupAdded")
3824 self.loop.run()
3825 return self
3826
3827 def groupStarted(self, properties):
3828 logger.debug("groupStarted: " + str(properties))
3829 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3830 properties['interface_object'])
3831 group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3832 group_p2p.Disconnect()
3833
3834 def groupFinished(self, properties):
3835 logger.debug("groupFinished: " + str(properties))
3836 self.loop.quit()
3837
3838 def persistentGroupAdded(self, path, properties):
3839 logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
3840 self.persistent = path
3841
3842 def run_test(self, *args):
3843 logger.debug("run_test")
3844 params = dbus.Dictionary({'persistent': True,
3845 'frequency': 2412})
3846 logger.info("Add a persistent group")
3847 p2p.GroupAdd(params)
3848 return False
3849
3850 def success(self):
3851 return True
3852
3853 with TestDbusP2p(bus) as t:
3854 if not t.success():
3855 raise Exception("Expected signals not seen")
3856 persistent = t.persistent
3857
3858 p_obj = bus.get_object(WPAS_DBUS_SERVICE, persistent)
3859 res = p_obj.Get(WPAS_DBUS_PERSISTENT_GROUP, "Properties",
3860 dbus_interface=dbus.PROPERTIES_IFACE, byte_arrays=True)
3861 logger.info("Persistent group Properties: " + str(res))
3862 vals = dbus.Dictionary({ 'ssid': 'DIRECT-foo' }, signature='sv')
3863 p_obj.Set(WPAS_DBUS_PERSISTENT_GROUP, "Properties", vals,
3864 dbus_interface=dbus.PROPERTIES_IFACE)
3865 res2 = p_obj.Get(WPAS_DBUS_PERSISTENT_GROUP, "Properties",
3866 dbus_interface=dbus.PROPERTIES_IFACE)
3867 if len(res) != len(res2):
3868 raise Exception("Different number of parameters")
3869 for k in res:
3870 if k != 'ssid' and res[k] != res2[k]:
3871 raise Exception("Parameter %s value changes" % k)
3872 if res2['ssid'] != '"DIRECT-foo"':
3873 raise Exception("Unexpected ssid")
3874
3875 args = dbus.Dictionary({ 'ssid': 'DIRECT-testing',
3876 'psk': '1234567890' }, signature='sv')
3877 group = p2p.AddPersistentGroup(args)
3878
3879 groups = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PersistentGroups",
3880 dbus_interface=dbus.PROPERTIES_IFACE)
3881 if len(groups) != 2:
3882 raise Exception("Unexpected number of persistent groups: " + str(groups))
3883
3884 p2p.RemoveAllPersistentGroups()
3885
3886 groups = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PersistentGroups",
3887 dbus_interface=dbus.PROPERTIES_IFACE)
3888 if len(groups) != 0:
3889 raise Exception("Unexpected number of persistent groups: " + str(groups))
3890
3891 try:
3892 p2p.RemovePersistentGroup(persistent)
3893 raise Exception("Invalid RemovePersistentGroup accepted")
3894 except dbus.exceptions.DBusException, e:
3895 if "NetworkUnknown: There is no such persistent group" not in str(e):
3896 raise Exception("Unexpected error message for invalid RemovePersistentGroup: " + str(e))
3897
3898 def test_dbus_p2p_reinvoke_persistent(dev, apdev):
3899 """D-Bus P2P reinvoke persistent group"""
3900 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3901 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3902
3903 addr0 = dev[0].p2p_dev_addr()
3904
3905 class TestDbusP2p(TestDbus):
3906 def __init__(self, bus):
3907 TestDbus.__init__(self, bus)
3908 self.first = True
3909 self.waiting_end = False
3910 self.done = False
3911 self.invited = False
3912
3913 def __enter__(self):
3914 gobject.timeout_add(1, self.run_test)
3915 gobject.timeout_add(15000, self.timeout)
3916 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3917 "DeviceFound")
3918 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3919 "GroupStarted")
3920 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3921 "GroupFinished")
3922 self.add_signal(self.persistentGroupAdded,
3923 WPAS_DBUS_IFACE_P2PDEVICE,
3924 "PersistentGroupAdded")
3925 self.add_signal(self.provisionDiscoveryRequestDisplayPin,
3926 WPAS_DBUS_IFACE_P2PDEVICE,
3927 "ProvisionDiscoveryRequestDisplayPin")
3928 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
3929 "StaAuthorized")
3930 self.loop.run()
3931 return self
3932
3933 def groupStarted(self, properties):
3934 logger.debug("groupStarted: " + str(properties))
3935 self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3936 properties['interface_object'])
3937 if not self.invited:
3938 g_obj = bus.get_object(WPAS_DBUS_SERVICE,
3939 properties['group_object'])
3940 res = g_obj.GetAll(WPAS_DBUS_GROUP,
3941 dbus_interface=dbus.PROPERTIES_IFACE,
3942 byte_arrays=True)
3943 bssid = ':'.join([binascii.hexlify(l) for l in res['BSSID']])
3944 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3945 dev1.scan_for_bss(bssid, freq=2412)
3946 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 join")
3947
3948 def groupFinished(self, properties):
3949 logger.debug("groupFinished: " + str(properties))
3950 if self.invited:
3951 self.done = True
3952 self.loop.quit()
3953 else:
3954 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3955 dev1.global_request("SET persistent_reconnect 1")
3956 dev1.p2p_listen()
3957
3958 args = { 'persistent_group_object': dbus.ObjectPath(path),
3959 'peer': self.peer_path }
3960 try:
3961 pin = p2p.Invite(args)
3962 raise Exception("Invalid Invite accepted")
3963 except dbus.exceptions.DBusException, e:
3964 if "InvalidArgs" not in str(e):
3965 raise Exception("Unexpected error message for invalid Invite: " + str(e))
3966
3967 args = { 'persistent_group_object': self.persistent,
3968 'peer': self.peer_path }
3969 pin = p2p.Invite(args)
3970 self.invited = True
3971
3972 self.sta_group_ev = dev1.wait_global_event(["P2P-GROUP-STARTED"],
3973 timeout=15)
3974 if self.sta_group_ev is None:
3975 raise Exception("P2P-GROUP-STARTED event not seen")
3976
3977 def persistentGroupAdded(self, path, properties):
3978 logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
3979 self.persistent = path
3980
3981 def deviceFound(self, path):
3982 logger.debug("deviceFound: path=%s" % path)
3983 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3984 self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3985 dbus_interface=dbus.PROPERTIES_IFACE,
3986 byte_arrays=True)
3987
3988 def provisionDiscoveryRequestDisplayPin(self, peer_object, pin):
3989 logger.debug("provisionDiscoveryRequestDisplayPin - peer=%s pin=%s" % (peer_object, pin))
3990 self.peer_path = peer_object
3991 peer = binascii.unhexlify(peer_object.split('/')[-1])
3992 addr = ""
3993 for p in peer:
3994 if len(addr) > 0:
3995 addr += ':'
3996 addr += '%02x' % ord(p)
3997 params = { 'Role': 'registrar',
3998 'P2PDeviceAddress': self.peer['DeviceAddress'],
3999 'Bssid': self.peer['DeviceAddress'],
4000 'Type': 'pin',
4001 'Pin': '12345670' }
4002 logger.info("Authorize peer to connect to the group")
4003 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4004 wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS)
4005 wps.Start(params)
4006 self.sta_group_ev = dev1.wait_global_event(["P2P-GROUP-STARTED"],
4007 timeout=15)
4008 if self.sta_group_ev is None:
4009 raise Exception("P2P-GROUP-STARTED event not seen")
4010
4011 def staAuthorized(self, name):
4012 logger.debug("staAuthorized: " + name)
4013 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4014 dev1.group_form_result(self.sta_group_ev)
4015 dev1.remove_group()
4016 ev = dev1.wait_global_event(["P2P-GROUP-REMOVED"], timeout=10)
4017 if ev is None:
4018 raise Exception("Group removal timed out")
4019 group_p2p = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4020 group_p2p.Disconnect()
4021
4022 def run_test(self, *args):
4023 logger.debug("run_test")
4024 params = dbus.Dictionary({'persistent': True,
4025 'frequency': 2412})
4026 logger.info("Add a persistent group")
4027 p2p.GroupAdd(params)
4028 return False
4029
4030 def success(self):
4031 return self.done
4032
4033 with TestDbusP2p(bus) as t:
4034 if not t.success():
4035 raise Exception("Expected signals not seen")
4036
4037 def test_dbus_p2p_go_neg_rx(dev, apdev):
4038 """D-Bus P2P GO Negotiation receive"""
4039 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4040 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4041 addr0 = dev[0].p2p_dev_addr()
4042
4043 class TestDbusP2p(TestDbus):
4044 def __init__(self, bus):
4045 TestDbus.__init__(self, bus)
4046 self.done = False
4047
4048 def __enter__(self):
4049 gobject.timeout_add(1, self.run_test)
4050 gobject.timeout_add(15000, self.timeout)
4051 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4052 "DeviceFound")
4053 self.add_signal(self.goNegotiationRequest,
4054 WPAS_DBUS_IFACE_P2PDEVICE,
4055 "GONegotiationRequest",
4056 byte_arrays=True)
4057 self.add_signal(self.goNegotiationSuccess,
4058 WPAS_DBUS_IFACE_P2PDEVICE,
4059 "GONegotiationSuccess",
4060 byte_arrays=True)
4061 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4062 "GroupStarted")
4063 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4064 "GroupFinished")
4065 self.loop.run()
4066 return self
4067
4068 def deviceFound(self, path):
4069 logger.debug("deviceFound: path=%s" % path)
4070
4071 def goNegotiationRequest(self, path, dev_passwd_id, go_intent=0):
4072 logger.debug("goNegotiationRequest: path=%s dev_passwd_id=%d go_intent=%d" % (path, dev_passwd_id, go_intent))
4073 if dev_passwd_id != 1:
4074 raise Exception("Unexpected dev_passwd_id=%d" % dev_passwd_id)
4075 args = { 'peer': path, 'wps_method': 'display', 'pin': '12345670',
4076 'go_intent': 15, 'persistent': False, 'frequency': 5175 }
4077 try:
4078 p2p.Connect(args)
4079 raise Exception("Invalid Connect accepted")
4080 except dbus.exceptions.DBusException, e:
4081 if "ConnectChannelUnsupported" not in str(e):
4082 raise Exception("Unexpected error message for invalid Connect: " + str(e))
4083
4084 args = { 'peer': path, 'wps_method': 'display', 'pin': '12345670',
4085 'go_intent': 15, 'persistent': False }
4086 p2p.Connect(args)
4087
4088 def goNegotiationSuccess(self, properties):
4089 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4090
4091 def groupStarted(self, properties):
4092 logger.debug("groupStarted: " + str(properties))
4093 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4094 properties['interface_object'])
4095 group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4096 group_p2p.Disconnect()
4097
4098 def groupFinished(self, properties):
4099 logger.debug("groupFinished: " + str(properties))
4100 self.done = True
4101 self.loop.quit()
4102
4103 def run_test(self, *args):
4104 logger.debug("run_test")
4105 p2p.Listen(10)
4106 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4107 if not dev1.discover_peer(addr0):
4108 raise Exception("Peer not found")
4109 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 enter")
4110 return False
4111
4112 def success(self):
4113 return self.done
4114
4115 with TestDbusP2p(bus) as t:
4116 if not t.success():
4117 raise Exception("Expected signals not seen")
4118
4119 def test_dbus_p2p_go_neg_auth(dev, apdev):
4120 """D-Bus P2P GO Negotiation authorized"""
4121 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4122 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4123 addr0 = dev[0].p2p_dev_addr()
4124 dev[1].p2p_listen()
4125
4126 class TestDbusP2p(TestDbus):
4127 def __init__(self, bus):
4128 TestDbus.__init__(self, bus)
4129 self.done = False
4130 self.peer_joined = False
4131 self.peer_disconnected = False
4132
4133 def __enter__(self):
4134 gobject.timeout_add(1, self.run_test)
4135 gobject.timeout_add(15000, self.timeout)
4136 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4137 "DeviceFound")
4138 self.add_signal(self.goNegotiationSuccess,
4139 WPAS_DBUS_IFACE_P2PDEVICE,
4140 "GONegotiationSuccess",
4141 byte_arrays=True)
4142 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4143 "GroupStarted")
4144 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4145 "GroupFinished")
4146 self.add_signal(self.staDeauthorized, WPAS_DBUS_IFACE,
4147 "StaDeauthorized")
4148 self.add_signal(self.peerJoined, WPAS_DBUS_GROUP,
4149 "PeerJoined")
4150 self.add_signal(self.peerDisconnected, WPAS_DBUS_GROUP,
4151 "PeerDisconnected")
4152 self.loop.run()
4153 return self
4154
4155 def deviceFound(self, path):
4156 logger.debug("deviceFound: path=%s" % path)
4157 args = { 'peer': path, 'wps_method': 'keypad',
4158 'go_intent': 15, 'authorize_only': True }
4159 try:
4160 p2p.Connect(args)
4161 raise Exception("Invalid Connect accepted")
4162 except dbus.exceptions.DBusException, e:
4163 if "InvalidArgs" not in str(e):
4164 raise Exception("Unexpected error message for invalid Connect: " + str(e))
4165
4166 args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4167 'go_intent': 15, 'authorize_only': True }
4168 p2p.Connect(args)
4169 p2p.Listen(10)
4170 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4171 if not dev1.discover_peer(addr0):
4172 raise Exception("Peer not found")
4173 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=0")
4174 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15);
4175 if ev is None:
4176 raise Exception("Group formation timed out")
4177 self.sta_group_ev = ev
4178
4179 def goNegotiationSuccess(self, properties):
4180 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4181
4182 def groupStarted(self, properties):
4183 logger.debug("groupStarted: " + str(properties))
4184 self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4185 properties['interface_object'])
4186 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4187 dev1.group_form_result(self.sta_group_ev)
4188 dev1.remove_group()
4189
4190 def staDeauthorized(self, name):
4191 logger.debug("staDeuthorized: " + name)
4192 group_p2p = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4193 group_p2p.Disconnect()
4194
4195 def peerJoined(self, peer):
4196 logger.debug("peerJoined: " + peer)
4197 self.peer_joined = True
4198
4199 def peerDisconnected(self, peer):
4200 logger.debug("peerDisconnected: " + peer)
4201 self.peer_disconnected = True
4202
4203 def groupFinished(self, properties):
4204 logger.debug("groupFinished: " + str(properties))
4205 self.done = True
4206 self.loop.quit()
4207
4208 def run_test(self, *args):
4209 logger.debug("run_test")
4210 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4211 return False
4212
4213 def success(self):
4214 return self.done and self.peer_joined and self.peer_disconnected
4215
4216 with TestDbusP2p(bus) as t:
4217 if not t.success():
4218 raise Exception("Expected signals not seen")
4219
4220 def test_dbus_p2p_go_neg_init(dev, apdev):
4221 """D-Bus P2P GO Negotiation initiation"""
4222 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4223 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4224 addr0 = dev[0].p2p_dev_addr()
4225 dev[1].p2p_listen()
4226
4227 class TestDbusP2p(TestDbus):
4228 def __init__(self, bus):
4229 TestDbus.__init__(self, bus)
4230 self.done = False
4231 self.peer_group_added = False
4232 self.peer_group_removed = False
4233
4234 def __enter__(self):
4235 gobject.timeout_add(1, self.run_test)
4236 gobject.timeout_add(15000, self.timeout)
4237 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4238 "DeviceFound")
4239 self.add_signal(self.goNegotiationSuccess,
4240 WPAS_DBUS_IFACE_P2PDEVICE,
4241 "GONegotiationSuccess",
4242 byte_arrays=True)
4243 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4244 "GroupStarted")
4245 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4246 "GroupFinished")
4247 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4248 "PropertiesChanged")
4249 self.loop.run()
4250 return self
4251
4252 def deviceFound(self, path):
4253 logger.debug("deviceFound: path=%s" % path)
4254 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4255 args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4256 'go_intent': 0 }
4257 p2p.Connect(args)
4258
4259 ev = dev1.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15)
4260 if ev is None:
4261 raise Exception("Timeout while waiting for GO Neg Request")
4262 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15")
4263 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15);
4264 if ev is None:
4265 raise Exception("Group formation timed out")
4266 self.sta_group_ev = ev
4267
4268 def goNegotiationSuccess(self, properties):
4269 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4270
4271 def groupStarted(self, properties):
4272 logger.debug("groupStarted: " + str(properties))
4273 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4274 properties['interface_object'])
4275 group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4276 group_p2p.Disconnect()
4277 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4278 dev1.group_form_result(self.sta_group_ev)
4279 dev1.remove_group()
4280
4281 def groupFinished(self, properties):
4282 logger.debug("groupFinished: " + str(properties))
4283 self.done = True
4284
4285 def propertiesChanged(self, interface_name, changed_properties,
4286 invalidated_properties):
4287 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
4288 if interface_name != WPAS_DBUS_P2P_PEER:
4289 return
4290 if "Groups" not in changed_properties:
4291 return
4292 if len(changed_properties["Groups"]) > 0:
4293 self.peer_group_added = True
4294 if len(changed_properties["Groups"]) == 0:
4295 self.peer_group_removed = True
4296 self.loop.quit()
4297
4298 def run_test(self, *args):
4299 logger.debug("run_test")
4300 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4301 return False
4302
4303 def success(self):
4304 return self.done and self.peer_group_added and self.peer_group_removed
4305
4306 with TestDbusP2p(bus) as t:
4307 if not t.success():
4308 raise Exception("Expected signals not seen")
4309
4310 def test_dbus_p2p_group_termination_by_go(dev, apdev):
4311 """D-Bus P2P group removal on GO terminating the group"""
4312 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4313 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4314 addr0 = dev[0].p2p_dev_addr()
4315 dev[1].p2p_listen()
4316
4317 class TestDbusP2p(TestDbus):
4318 def __init__(self, bus):
4319 TestDbus.__init__(self, bus)
4320 self.done = False
4321 self.peer_group_added = False
4322 self.peer_group_removed = False
4323
4324 def __enter__(self):
4325 gobject.timeout_add(1, self.run_test)
4326 gobject.timeout_add(15000, self.timeout)
4327 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4328 "DeviceFound")
4329 self.add_signal(self.goNegotiationSuccess,
4330 WPAS_DBUS_IFACE_P2PDEVICE,
4331 "GONegotiationSuccess",
4332 byte_arrays=True)
4333 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4334 "GroupStarted")
4335 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4336 "GroupFinished")
4337 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4338 "PropertiesChanged")
4339 self.loop.run()
4340 return self
4341
4342 def deviceFound(self, path):
4343 logger.debug("deviceFound: path=%s" % path)
4344 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4345 args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4346 'go_intent': 0 }
4347 p2p.Connect(args)
4348
4349 ev = dev1.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15)
4350 if ev is None:
4351 raise Exception("Timeout while waiting for GO Neg Request")
4352 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15")
4353 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15);
4354 if ev is None:
4355 raise Exception("Group formation timed out")
4356 self.sta_group_ev = ev
4357
4358 def goNegotiationSuccess(self, properties):
4359 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4360
4361 def groupStarted(self, properties):
4362 logger.debug("groupStarted: " + str(properties))
4363 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4364 properties['interface_object'])
4365 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4366 dev1.group_form_result(self.sta_group_ev)
4367 dev1.remove_group()
4368
4369 def groupFinished(self, properties):
4370 logger.debug("groupFinished: " + str(properties))
4371 self.done = True
4372
4373 def propertiesChanged(self, interface_name, changed_properties,
4374 invalidated_properties):
4375 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
4376 if interface_name != WPAS_DBUS_P2P_PEER:
4377 return
4378 if "Groups" not in changed_properties:
4379 return
4380 if len(changed_properties["Groups"]) > 0:
4381 self.peer_group_added = True
4382 if len(changed_properties["Groups"]) == 0 and self.peer_group_added:
4383 self.peer_group_removed = True
4384 self.loop.quit()
4385
4386 def run_test(self, *args):
4387 logger.debug("run_test")
4388 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4389 return False
4390
4391 def success(self):
4392 return self.done and self.peer_group_added and self.peer_group_removed
4393
4394 with TestDbusP2p(bus) as t:
4395 if not t.success():
4396 raise Exception("Expected signals not seen")
4397
4398 def test_dbus_p2p_group_idle_timeout(dev, apdev):
4399 """D-Bus P2P group removal on idle timeout"""
4400 try:
4401 dev[0].global_request("SET p2p_group_idle 1")
4402 _test_dbus_p2p_group_idle_timeout(dev, apdev)
4403 finally:
4404 dev[0].global_request("SET p2p_group_idle 0")
4405
4406 def _test_dbus_p2p_group_idle_timeout(dev, apdev):
4407 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4408 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4409 addr0 = dev[0].p2p_dev_addr()
4410 dev[1].p2p_listen()
4411
4412 class TestDbusP2p(TestDbus):
4413 def __init__(self, bus):
4414 TestDbus.__init__(self, bus)
4415 self.done = False
4416 self.group_started = False
4417 self.peer_group_added = False
4418 self.peer_group_removed = False
4419
4420 def __enter__(self):
4421 gobject.timeout_add(1, self.run_test)
4422 gobject.timeout_add(15000, self.timeout)
4423 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4424 "DeviceFound")
4425 self.add_signal(self.goNegotiationSuccess,
4426 WPAS_DBUS_IFACE_P2PDEVICE,
4427 "GONegotiationSuccess",
4428 byte_arrays=True)
4429 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4430 "GroupStarted")
4431 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4432 "GroupFinished")
4433 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4434 "PropertiesChanged")
4435 self.loop.run()
4436 return self
4437
4438 def deviceFound(self, path):
4439 logger.debug("deviceFound: path=%s" % path)
4440 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4441 args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4442 'go_intent': 0 }
4443 p2p.Connect(args)
4444
4445 ev = dev1.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15)
4446 if ev is None:
4447 raise Exception("Timeout while waiting for GO Neg Request")
4448 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15")
4449 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15);
4450 if ev is None:
4451 raise Exception("Group formation timed out")
4452 self.sta_group_ev = ev
4453
4454 def goNegotiationSuccess(self, properties):
4455 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4456
4457 def groupStarted(self, properties):
4458 logger.debug("groupStarted: " + str(properties))
4459 self.group_started = True
4460 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4461 properties['interface_object'])
4462 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4463 dev1.group_form_result(self.sta_group_ev)
4464 ifaddr = dev1.group_request("STA-FIRST").splitlines()[0]
4465 # Force disassociation with different reason code so that the
4466 # P2P Client using D-Bus does not get normal group termination event
4467 # from the GO.
4468 dev1.group_request("DEAUTHENTICATE " + ifaddr + " reason=0 test=0")
4469 dev1.remove_group()
4470
4471 def groupFinished(self, properties):
4472 logger.debug("groupFinished: " + str(properties))
4473 self.done = True
4474
4475 def propertiesChanged(self, interface_name, changed_properties,
4476 invalidated_properties):
4477 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
4478 if interface_name != WPAS_DBUS_P2P_PEER:
4479 return
4480 if not self.group_started:
4481 return
4482 if "Groups" not in changed_properties:
4483 return
4484 if len(changed_properties["Groups"]) > 0:
4485 self.peer_group_added = True
4486 if len(changed_properties["Groups"]) == 0:
4487 self.peer_group_removed = True
4488 self.loop.quit()
4489
4490 def run_test(self, *args):
4491 logger.debug("run_test")
4492 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4493 return False
4494
4495 def success(self):
4496 return self.done and self.peer_group_added and self.peer_group_removed
4497
4498 with TestDbusP2p(bus) as t:
4499 if not t.success():
4500 raise Exception("Expected signals not seen")
4501
4502 def test_dbus_p2p_wps_failure(dev, apdev):
4503 """D-Bus P2P WPS failure"""
4504 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4505 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4506 addr0 = dev[0].p2p_dev_addr()
4507
4508 class TestDbusP2p(TestDbus):
4509 def __init__(self, bus):
4510 TestDbus.__init__(self, bus)
4511 self.wps_failed = False
4512 self.formation_failure = False
4513
4514 def __enter__(self):
4515 gobject.timeout_add(1, self.run_test)
4516 gobject.timeout_add(15000, self.timeout)
4517 self.add_signal(self.goNegotiationRequest,
4518 WPAS_DBUS_IFACE_P2PDEVICE,
4519 "GONegotiationRequest",
4520 byte_arrays=True)
4521 self.add_signal(self.goNegotiationSuccess,
4522 WPAS_DBUS_IFACE_P2PDEVICE,
4523 "GONegotiationSuccess",
4524 byte_arrays=True)
4525 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4526 "GroupStarted")
4527 self.add_signal(self.wpsFailed, WPAS_DBUS_IFACE_P2PDEVICE,
4528 "WpsFailed")
4529 self.add_signal(self.groupFormationFailure,
4530 WPAS_DBUS_IFACE_P2PDEVICE,
4531 "GroupFormationFailure")
4532 self.loop.run()
4533 return self
4534
4535 def goNegotiationRequest(self, path, dev_passwd_id, go_intent=0):
4536 logger.debug("goNegotiationRequest: path=%s dev_passwd_id=%d go_intent=%d" % (path, dev_passwd_id, go_intent))
4537 if dev_passwd_id != 1:
4538 raise Exception("Unexpected dev_passwd_id=%d" % dev_passwd_id)
4539 args = { 'peer': path, 'wps_method': 'display', 'pin': '12345670',
4540 'go_intent': 15 }
4541 p2p.Connect(args)
4542
4543 def goNegotiationSuccess(self, properties):
4544 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4545
4546 def groupStarted(self, properties):
4547 logger.debug("groupStarted: " + str(properties))
4548 raise Exception("Unexpected GroupStarted")
4549
4550 def wpsFailed(self, name, args):
4551 logger.debug("wpsFailed - name=%s args=%s" % (name, str(args)))
4552 self.wps_failed = True
4553 if self.formation_failure:
4554 self.loop.quit()
4555
4556 def groupFormationFailure(self, reason):
4557 logger.debug("groupFormationFailure - reason=%s" % reason)
4558 self.formation_failure = True
4559 if self.wps_failed:
4560 self.loop.quit()
4561
4562 def run_test(self, *args):
4563 logger.debug("run_test")
4564 p2p.Listen(10)
4565 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4566 if not dev1.discover_peer(addr0):
4567 raise Exception("Peer not found")
4568 dev1.global_request("P2P_CONNECT " + addr0 + " 87654321 enter")
4569 return False
4570
4571 def success(self):
4572 return self.wps_failed and self.formation_failure
4573
4574 with TestDbusP2p(bus) as t:
4575 if not t.success():
4576 raise Exception("Expected signals not seen")
4577
4578 def test_dbus_p2p_two_groups(dev, apdev):
4579 """D-Bus P2P with two concurrent groups"""
4580 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4581 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4582
4583 dev[0].request("SET p2p_no_group_iface 0")
4584 addr0 = dev[0].p2p_dev_addr()
4585 addr1 = dev[1].p2p_dev_addr()
4586 addr2 = dev[2].p2p_dev_addr()
4587 dev[1].p2p_start_go(freq=2412)
4588 dev1_group_ifname = dev[1].group_ifname
4589
4590 class TestDbusP2p(TestDbus):
4591 def __init__(self, bus):
4592 TestDbus.__init__(self, bus)
4593 self.done = False
4594 self.peer = None
4595 self.go = None
4596 self.group1 = None
4597 self.group2 = None
4598 self.groups_removed = False
4599
4600 def __enter__(self):
4601 gobject.timeout_add(1, self.run_test)
4602 gobject.timeout_add(15000, self.timeout)
4603 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4604 "PropertiesChanged", byte_arrays=True)
4605 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4606 "DeviceFound")
4607 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4608 "GroupStarted")
4609 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4610 "GroupFinished")
4611 self.add_signal(self.peerJoined, WPAS_DBUS_GROUP,
4612 "PeerJoined")
4613 self.loop.run()
4614 return self
4615
4616 def propertiesChanged(self, interface_name, changed_properties,
4617 invalidated_properties):
4618 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
4619
4620 def deviceFound(self, path):
4621 logger.debug("deviceFound: path=%s" % path)
4622 if addr2.replace(':','') in path:
4623 self.peer = path
4624 elif addr1.replace(':','') in path:
4625 self.go = path
4626 if self.go and not self.group1:
4627 logger.info("Join the group")
4628 p2p.StopFind()
4629 pin = '12345670'
4630 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4631 dev1.group_ifname = dev1_group_ifname
4632 dev1.group_request("WPS_PIN any " + pin)
4633 args = { 'peer': self.go,
4634 'join': True,
4635 'wps_method': 'pin',
4636 'pin': pin,
4637 'frequency': 2412 }
4638 p2p.Connect(args)
4639
4640 def groupStarted(self, properties):
4641 logger.debug("groupStarted: " + str(properties))
4642 prop = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE,
4643 dbus_interface=dbus.PROPERTIES_IFACE)
4644 logger.debug("p2pdevice properties: " + str(prop))
4645
4646 g_obj = bus.get_object(WPAS_DBUS_SERVICE,
4647 properties['group_object'])
4648 res = g_obj.GetAll(WPAS_DBUS_GROUP,
4649 dbus_interface=dbus.PROPERTIES_IFACE,
4650 byte_arrays=True)
4651 logger.debug("Group properties: " + str(res))
4652
4653 if not self.group1:
4654 self.group1 = properties['group_object']
4655 self.group1iface = properties['interface_object']
4656 self.g1_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4657 self.group1iface)
4658
4659 logger.info("Start autonomous GO")
4660 params = dbus.Dictionary({ 'frequency': 2412 })
4661 p2p.GroupAdd(params)
4662 elif not self.group2:
4663 self.group2 = properties['group_object']
4664 self.group2iface = properties['interface_object']
4665 self.g2_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4666 self.group2iface)
4667 self.g2_bssid = res['BSSID']
4668
4669 if self.group1 and self.group2:
4670 logger.info("Authorize peer to join the group")
4671 a2 = binascii.unhexlify(addr2.replace(':',''))
4672 params = { 'Role': 'enrollee',
4673 'P2PDeviceAddress': dbus.ByteArray(a2),
4674 'Bssid': dbus.ByteArray(a2),
4675 'Type': 'pin',
4676 'Pin': '12345670' }
4677 g_wps = dbus.Interface(self.g2_if_obj, WPAS_DBUS_IFACE_WPS)
4678 g_wps.Start(params)
4679
4680 bssid = ':'.join([binascii.hexlify(l) for l in self.g2_bssid])
4681 dev2 = WpaSupplicant('wlan2', '/tmp/wpas-wlan2')
4682 dev2.scan_for_bss(bssid, freq=2412)
4683 dev2.global_request("P2P_CONNECT " + bssid + " 12345670 join freq=2412")
4684 ev = dev2.wait_global_event(["P2P-GROUP-STARTED"], timeout=15);
4685 if ev is None:
4686 raise Exception("Group join timed out")
4687 self.dev2_group_ev = ev
4688
4689 def groupFinished(self, properties):
4690 logger.debug("groupFinished: " + str(properties))
4691
4692 if self.group1 == properties['group_object']:
4693 self.group1 = None
4694 elif self.group2 == properties['group_object']:
4695 self.group2 = None
4696
4697 if not self.group1 and not self.group2:
4698 self.done = True
4699 self.loop.quit()
4700
4701 def peerJoined(self, peer):
4702 logger.debug("peerJoined: " + peer)
4703 if self.groups_removed:
4704 return
4705 self.check_results()
4706
4707 dev2 = WpaSupplicant('wlan2', '/tmp/wpas-wlan2')
4708 dev2.group_form_result(self.dev2_group_ev)
4709 dev2.remove_group()
4710
4711 logger.info("Disconnect group2")
4712 group_p2p = dbus.Interface(self.g2_if_obj,
4713 WPAS_DBUS_IFACE_P2PDEVICE)
4714 group_p2p.Disconnect()
4715
4716 logger.info("Disconnect group1")
4717 group_p2p = dbus.Interface(self.g1_if_obj,
4718 WPAS_DBUS_IFACE_P2PDEVICE)
4719 group_p2p.Disconnect()
4720 self.groups_removed = True
4721
4722 def check_results(self):
4723 logger.info("Check results with two concurrent groups in operation")
4724
4725 g1_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group1)
4726 res1 = g1_obj.GetAll(WPAS_DBUS_GROUP,
4727 dbus_interface=dbus.PROPERTIES_IFACE,
4728 byte_arrays=True)
4729
4730 g2_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group2)
4731 res2 = g2_obj.GetAll(WPAS_DBUS_GROUP,
4732 dbus_interface=dbus.PROPERTIES_IFACE,
4733 byte_arrays=True)
4734
4735 logger.info("group1 = " + self.group1)
4736 logger.debug("Group properties: " + str(res1))
4737
4738 logger.info("group2 = " + self.group2)
4739 logger.debug("Group properties: " + str(res2))
4740
4741 prop = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE,
4742 dbus_interface=dbus.PROPERTIES_IFACE)
4743 logger.debug("p2pdevice properties: " + str(prop))
4744
4745 if res1['Role'] != 'client':
4746 raise Exception("Group1 role reported incorrectly: " + res1['Role'])
4747 if res2['Role'] != 'GO':
4748 raise Exception("Group2 role reported incorrectly: " + res2['Role'])
4749 if prop['Role'] != 'device':
4750 raise Exception("p2pdevice role reported incorrectly: " + prop['Role'])
4751
4752 if len(res2['Members']) != 1:
4753 raise Exception("Unexpected Members value for group 2")
4754
4755 def run_test(self, *args):
4756 logger.debug("run_test")
4757 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4758 return False
4759
4760 def success(self):
4761 return self.done
4762
4763 with TestDbusP2p(bus) as t:
4764 if not t.success():
4765 raise Exception("Expected signals not seen")
4766
4767 dev[1].remove_group()
4768
4769 def test_dbus_p2p_cancel(dev, apdev):
4770 """D-Bus P2P Cancel"""
4771 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4772 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4773 try:
4774 p2p.Cancel()
4775 raise Exception("Unexpected p2p.Cancel() success")
4776 except dbus.exceptions.DBusException, e:
4777 pass
4778
4779 addr0 = dev[0].p2p_dev_addr()
4780 dev[1].p2p_listen()
4781
4782 class TestDbusP2p(TestDbus):
4783 def __init__(self, bus):
4784 TestDbus.__init__(self, bus)
4785 self.done = False
4786
4787 def __enter__(self):
4788 gobject.timeout_add(1, self.run_test)
4789 gobject.timeout_add(15000, self.timeout)
4790 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4791 "DeviceFound")
4792 self.loop.run()
4793 return self
4794
4795 def deviceFound(self, path):
4796 logger.debug("deviceFound: path=%s" % path)
4797 args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4798 'go_intent': 0 }
4799 p2p.Connect(args)
4800 p2p.Cancel()
4801 self.done = True
4802 self.loop.quit()
4803
4804 def run_test(self, *args):
4805 logger.debug("run_test")
4806 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4807 return False
4808
4809 def success(self):
4810 return self.done
4811
4812 with TestDbusP2p(bus) as t:
4813 if not t.success():
4814 raise Exception("Expected signals not seen")
4815
4816 def test_dbus_introspect(dev, apdev):
4817 """D-Bus introspection"""
4818 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4819
4820 res = if_obj.Introspect(WPAS_DBUS_IFACE,
4821 dbus_interface=dbus.INTROSPECTABLE_IFACE)
4822 logger.info("Initial Introspect: " + str(res))
4823 if res is None or "Introspectable" not in res or "GroupStarted" not in res:
4824 raise Exception("Unexpected initial Introspect response: " + str(res))
4825 if "FastReauth" not in res or "PassiveScan" not in res:
4826 raise Exception("Unexpected initial Introspect response: " + str(res))
4827
4828 with alloc_fail(dev[0], 1, "wpa_dbus_introspect"):
4829 res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
4830 dbus_interface=dbus.INTROSPECTABLE_IFACE)
4831 logger.info("Introspect: " + str(res2))
4832 if res2 is not None:
4833 raise Exception("Unexpected Introspect response")
4834
4835 with alloc_fail(dev[0], 1, "=add_interface;wpa_dbus_introspect"):
4836 res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
4837 dbus_interface=dbus.INTROSPECTABLE_IFACE)
4838 logger.info("Introspect: " + str(res2))
4839 if res2 is None:
4840 raise Exception("No Introspect response")
4841 if len(res2) >= len(res):
4842 raise Exception("Unexpected Introspect response")
4843
4844 with alloc_fail(dev[0], 1, "wpabuf_alloc;add_interface;wpa_dbus_introspect"):
4845 res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
4846 dbus_interface=dbus.INTROSPECTABLE_IFACE)
4847 logger.info("Introspect: " + str(res2))
4848 if res2 is None:
4849 raise Exception("No Introspect response")
4850 if len(res2) >= len(res):
4851 raise Exception("Unexpected Introspect response")
4852
4853 with alloc_fail(dev[0], 2, "=add_interface;wpa_dbus_introspect"):
4854 res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
4855 dbus_interface=dbus.INTROSPECTABLE_IFACE)
4856 logger.info("Introspect: " + str(res2))
4857 if res2 is None:
4858 raise Exception("No Introspect response")
4859 if len(res2) >= len(res):
4860 raise Exception("Unexpected Introspect response")
4861
4862 def test_dbus_ap(dev, apdev):
4863 """D-Bus AddNetwork for AP mode"""
4864 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4865 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
4866
4867 ssid = "test-wpa2-psk"
4868 passphrase = 'qwertyuiop'
4869
4870 class TestDbusConnect(TestDbus):
4871 def __init__(self, bus):
4872 TestDbus.__init__(self, bus)
4873 self.started = False
4874
4875 def __enter__(self):
4876 gobject.timeout_add(1, self.run_connect)
4877 gobject.timeout_add(15000, self.timeout)
4878 self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
4879 self.add_signal(self.networkSelected, WPAS_DBUS_IFACE,
4880 "NetworkSelected")
4881 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
4882 "PropertiesChanged")
4883 self.loop.run()
4884 return self
4885
4886 def networkAdded(self, network, properties):
4887 logger.debug("networkAdded: %s" % str(network))
4888 logger.debug(str(properties))
4889
4890 def networkSelected(self, network):
4891 logger.debug("networkSelected: %s" % str(network))
4892 self.network_selected = True
4893
4894 def propertiesChanged(self, properties):
4895 logger.debug("propertiesChanged: %s" % str(properties))
4896 if 'State' in properties and properties['State'] == "completed":
4897 self.started = True
4898 self.loop.quit()
4899
4900 def run_connect(self, *args):
4901 logger.debug("run_connect")
4902 args = dbus.Dictionary({ 'ssid': ssid,
4903 'key_mgmt': 'WPA-PSK',
4904 'psk': passphrase,
4905 'mode': 2,
4906 'frequency': 2412 },
4907 signature='sv')
4908 self.netw = iface.AddNetwork(args)
4909 iface.SelectNetwork(self.netw)
4910 return False
4911
4912 def success(self):
4913 return self.started
4914
4915 with TestDbusConnect(bus) as t:
4916 if not t.success():
4917 raise Exception("Expected signals not seen")
4918 dev[1].connect(ssid, psk=passphrase, scan_freq="2412")
4919
4920 def test_dbus_connect_wpa_eap(dev, apdev):
4921 """D-Bus AddNetwork and connection with WPA+WPA2-Enterprise AP"""
4922 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4923 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
4924
4925 ssid = "test-wpa-eap"
4926 params = hostapd.wpa_eap_params(ssid=ssid)
4927 params["wpa"] = "3"
4928 params["rsn_pairwise"] = "CCMP"
4929 hapd = hostapd.add_ap(apdev[0]['ifname'], params)
4930
4931 class TestDbusConnect(TestDbus):
4932 def __init__(self, bus):
4933 TestDbus.__init__(self, bus)
4934 self.done = False
4935
4936 def __enter__(self):
4937 gobject.timeout_add(1, self.run_connect)
4938 gobject.timeout_add(15000, self.timeout)
4939 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
4940 "PropertiesChanged")
4941 self.add_signal(self.eap, WPAS_DBUS_IFACE, "EAP")
4942 self.loop.run()
4943 return self
4944
4945 def propertiesChanged(self, properties):
4946 logger.debug("propertiesChanged: %s" % str(properties))
4947 if 'State' in properties and properties['State'] == "completed":
4948 self.done = True
4949 self.loop.quit()
4950
4951 def eap(self, status, parameter):
4952 logger.debug("EAP: status=%s parameter=%s" % (status, parameter))
4953
4954 def run_connect(self, *args):
4955 logger.debug("run_connect")
4956 args = dbus.Dictionary({ 'ssid': ssid,
4957 'key_mgmt': 'WPA-EAP',
4958 'eap': 'PEAP',
4959 'identity': 'user',
4960 'password': 'password',
4961 'ca_cert': 'auth_serv/ca.pem',
4962 'phase2': 'auth=MSCHAPV2',
4963 'scan_freq': 2412 },
4964 signature='sv')
4965 self.netw = iface.AddNetwork(args)
4966 iface.SelectNetwork(self.netw)
4967 return False
4968
4969 def success(self):
4970 return self.done
4971
4972 with TestDbusConnect(bus) as t:
4973 if not t.success():
4974 raise Exception("Expected signals not seen")
4975
4976 def test_dbus_ap_scan_2_ap_mode_scan(dev, apdev):
4977 """AP_SCAN 2 AP mode and D-Bus Scan()"""
4978 try:
4979 _test_dbus_ap_scan_2_ap_mode_scan(dev, apdev)
4980 finally:
4981 dev[0].request("AP_SCAN 1")
4982
4983 def _test_dbus_ap_scan_2_ap_mode_scan(dev, apdev):
4984 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4985 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
4986
4987 if "OK" not in dev[0].request("AP_SCAN 2"):
4988 raise Exception("Failed to set AP_SCAN 2")
4989
4990 id = dev[0].add_network()
4991 dev[0].set_network(id, "mode", "2")
4992 dev[0].set_network_quoted(id, "ssid", "wpas-ap-open")
4993 dev[0].set_network(id, "key_mgmt", "NONE")
4994 dev[0].set_network(id, "frequency", "2412")
4995 dev[0].set_network(id, "scan_freq", "2412")
4996 dev[0].set_network(id, "disabled", "0")
4997 dev[0].select_network(id)
4998 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=5)
4999 if ev is None:
5000 raise Exception("AP failed to start")
5001
5002 with fail_test(dev[0], 1, "wpa_driver_nl80211_scan"):
5003 iface.Scan({'Type': 'active',
5004 'AllowRoam': True,
5005 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
5006 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-FAILED",
5007 "AP-DISABLED"], timeout=5)
5008 if ev is None:
5009 raise Exception("CTRL-EVENT-SCAN-FAILED not seen")
5010 if "AP-DISABLED" in ev:
5011 raise Exception("Unexpected AP-DISABLED event")
5012 if "retry=1" in ev:
5013 # Wait for the retry to scan happen
5014 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-FAILED",
5015 "AP-DISABLED"], timeout=5)
5016 if ev is None:
5017 raise Exception("CTRL-EVENT-SCAN-FAILED not seen - retry")
5018 if "AP-DISABLED" in ev:
5019 raise Exception("Unexpected AP-DISABLED event - retry")
5020
5021 dev[1].connect("wpas-ap-open", key_mgmt="NONE", scan_freq="2412")
5022 dev[1].request("DISCONNECT")
5023 dev[1].wait_disconnected()
5024 dev[0].request("DISCONNECT")
5025 dev[0].wait_disconnected()
5026
5027 def test_dbus_expectdisconnect(dev, apdev):
5028 """D-Bus ExpectDisconnect"""
5029 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5030 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
5031
5032 params = { "ssid": "test-open" }
5033 hapd = hostapd.add_ap(apdev[0]['ifname'], params)
5034 dev[0].connect("test-open", key_mgmt="NONE", scan_freq="2412")
5035
5036 # This does not really verify the behavior other than by going through the
5037 # code path for additional coverage.
5038 wpas.ExpectDisconnect()
5039 dev[0].request("DISCONNECT")
5040 dev[0].wait_disconnected()
5041
5042 def test_dbus_save_config(dev, apdev):
5043 """D-Bus SaveConfig"""
5044 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5045 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5046 try:
5047 iface.SaveConfig()
5048 raise Exception("SaveConfig() accepted unexpectedly")
5049 except dbus.exceptions.DBusException, e:
5050 if not str(e).startswith("fi.w1.wpa_supplicant1.UnknownError: Not allowed to update configuration"):
5051 raise Exception("Unexpected error message for SaveConfig(): " + str(e))
5052
5053 def test_dbus_vendor_elem(dev, apdev):
5054 """D-Bus vendor element operations"""
5055 try:
5056 _test_dbus_vendor_elem(dev, apdev)
5057 finally:
5058 dev[0].request("VENDOR_ELEM_REMOVE 1 *")
5059
5060 def _test_dbus_vendor_elem(dev, apdev):
5061 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5062 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5063
5064 dev[0].request("VENDOR_ELEM_REMOVE 1 *")
5065
5066 try:
5067 ie = dbus.ByteArray("\x00\x00")
5068 iface.VendorElemAdd(-1, ie)
5069 raise Exception("Invalid VendorElemAdd() accepted")
5070 except dbus.exceptions.DBusException, e:
5071 if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e):
5072 raise Exception("Unexpected error message for invalid VendorElemAdd[1]: " + str(e))
5073
5074 try:
5075 ie = dbus.ByteArray("")
5076 iface.VendorElemAdd(1, ie)
5077 raise Exception("Invalid VendorElemAdd() accepted")
5078 except dbus.exceptions.DBusException, e:
5079 if "InvalidArgs" not in str(e) or "Invalid value" not in str(e):
5080 raise Exception("Unexpected error message for invalid VendorElemAdd[2]: " + str(e))
5081
5082 try:
5083 ie = dbus.ByteArray("\x00\x01")
5084 iface.VendorElemAdd(1, ie)
5085 raise Exception("Invalid VendorElemAdd() accepted")
5086 except dbus.exceptions.DBusException, e:
5087 if "InvalidArgs" not in str(e) or "Parse error" not in str(e):
5088 raise Exception("Unexpected error message for invalid VendorElemAdd[3]: " + str(e))
5089
5090 try:
5091 iface.VendorElemGet(-1)
5092 raise Exception("Invalid VendorElemGet() accepted")
5093 except dbus.exceptions.DBusException, e:
5094 if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e):
5095 raise Exception("Unexpected error message for invalid VendorElemGet[1]: " + str(e))
5096
5097 try:
5098 iface.VendorElemGet(1)
5099 raise Exception("Invalid VendorElemGet() accepted")
5100 except dbus.exceptions.DBusException, e:
5101 if "InvalidArgs" not in str(e) or "ID value does not exist" not in str(e):
5102 raise Exception("Unexpected error message for invalid VendorElemGet[2]: " + str(e))
5103
5104 try:
5105 ie = dbus.ByteArray("\x00\x00")
5106 iface.VendorElemRem(-1, ie)
5107 raise Exception("Invalid VendorElemRemove() accepted")
5108 except dbus.exceptions.DBusException, e:
5109 if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e):
5110 raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e))
5111
5112 try:
5113 ie = dbus.ByteArray("")
5114 iface.VendorElemRem(1, ie)
5115 raise Exception("Invalid VendorElemRemove() accepted")
5116 except dbus.exceptions.DBusException, e:
5117 if "InvalidArgs" not in str(e) or "Invalid value" not in str(e):
5118 raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e))
5119
5120 iface.VendorElemRem(1, "*")
5121
5122 ie = dbus.ByteArray("\x00\x01\x00")
5123 iface.VendorElemAdd(1, ie)
5124
5125 val = iface.VendorElemGet(1)
5126 if len(val) != len(ie):
5127 raise Exception("Unexpected VendorElemGet length")
5128 for i in range(len(val)):
5129 if val[i] != dbus.Byte(ie[i]):
5130 raise Exception("Unexpected VendorElemGet data")
5131
5132 ie2 = dbus.ByteArray("\xe0\x00")
5133 iface.VendorElemAdd(1, ie2)
5134
5135 ies = ie + ie2
5136 val = iface.VendorElemGet(1)
5137 if len(val) != len(ies):
5138 raise Exception("Unexpected VendorElemGet length[2]")
5139 for i in range(len(val)):
5140 if val[i] != dbus.Byte(ies[i]):
5141 raise Exception("Unexpected VendorElemGet data[2]")
5142
5143 try:
5144 test_ie = dbus.ByteArray("\x01\x01")
5145 iface.VendorElemRem(1, test_ie)
5146 raise Exception("Invalid VendorElemRemove() accepted")
5147 except dbus.exceptions.DBusException, e:
5148 if "InvalidArgs" not in str(e) or "Parse error" not in str(e):
5149 raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e))
5150
5151 iface.VendorElemRem(1, ie)
5152 val = iface.VendorElemGet(1)
5153 if len(val) != len(ie2):
5154 raise Exception("Unexpected VendorElemGet length[3]")
5155
5156 iface.VendorElemRem(1, "*")
5157 try:
5158 iface.VendorElemGet(1)
5159 raise Exception("Invalid VendorElemGet() accepted after removal")
5160 except dbus.exceptions.DBusException, e:
5161 if "InvalidArgs" not in str(e) or "ID value does not exist" not in str(e):
5162 raise Exception("Unexpected error message for invalid VendorElemGet after removal: " + str(e))