]> git.ipfire.org Git - thirdparty/hostap.git/blame - tests/hwsim/test_dbus.py
D-Bus: Send P2P IP address assignment info with GroupStarted event
[thirdparty/hostap.git] / tests / hwsim / test_dbus.py
CommitLineData
2ec82e67 1# wpa_supplicant D-Bus interface tests
910eb5b5 2# Copyright (c) 2014-2015, Jouni Malinen <j@w1.fi>
2ec82e67
JM
3#
4# This software may be distributed under the terms of the BSD license.
5# See README for more details.
6
7import binascii
2ec82e67
JM
8import logging
9logger = logging.getLogger()
10import subprocess
11import time
12
910eb5b5 13try:
b4de353c 14 import gobject
910eb5b5
JM
15 import dbus
16 dbus_imported = True
17except ImportError:
18 dbus_imported = False
19
2ec82e67
JM
20import hostapd
21from wpasupplicant import WpaSupplicant
708ec753 22from utils import HwsimSkip, alloc_fail, fail_test
1992af11 23from p2p_utils import *
2ec82e67 24from test_ap_tdls import connect_2sta_open
089e7ca3 25from test_ap_eap import check_altsubject_match_support
2ec82e67
JM
26
27WPAS_DBUS_SERVICE = "fi.w1.wpa_supplicant1"
28WPAS_DBUS_PATH = "/fi/w1/wpa_supplicant1"
29WPAS_DBUS_IFACE = "fi.w1.wpa_supplicant1.Interface"
30WPAS_DBUS_IFACE_WPS = WPAS_DBUS_IFACE + ".WPS"
31WPAS_DBUS_NETWORK = "fi.w1.wpa_supplicant1.Network"
32WPAS_DBUS_BSS = "fi.w1.wpa_supplicant1.BSS"
33WPAS_DBUS_IFACE_P2PDEVICE = WPAS_DBUS_IFACE + ".P2PDevice"
34WPAS_DBUS_P2P_PEER = "fi.w1.wpa_supplicant1.Peer"
35WPAS_DBUS_GROUP = "fi.w1.wpa_supplicant1.Group"
36WPAS_DBUS_PERSISTENT_GROUP = "fi.w1.wpa_supplicant1.PersistentGroup"
37
38def prepare_dbus(dev):
910eb5b5
JM
39 if not dbus_imported:
40 logger.info("No dbus module available")
51c5aeb4 41 raise HwsimSkip("No dbus module available")
2ec82e67 42 try:
2ec82e67
JM
43 from dbus.mainloop.glib import DBusGMainLoop
44 dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
45 bus = dbus.SystemBus()
46 wpas_obj = bus.get_object(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH)
47 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
48 path = wpas.GetInterface(dev.ifname)
49 if_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
910eb5b5 50 return (bus,wpas_obj,path,if_obj)
2ec82e67 51 except Exception, e:
51c5aeb4 52 raise HwsimSkip("Could not connect to D-Bus: %s" % e)
2ec82e67
JM
53
54class TestDbus(object):
55 def __init__(self, bus):
56 self.loop = gobject.MainLoop()
57 self.signals = []
58 self.bus = bus
59
60 def __exit__(self, type, value, traceback):
61 for s in self.signals:
62 s.remove()
63
64 def add_signal(self, handler, interface, name, byte_arrays=False):
65 s = self.bus.add_signal_receiver(handler, dbus_interface=interface,
66 signal_name=name,
67 byte_arrays=byte_arrays)
68 self.signals.append(s)
69
70 def timeout(self, *args):
71 logger.debug("timeout")
72 self.loop.quit()
73 return False
74
0e126c6d
JM
75class alloc_fail_dbus(object):
76 def __init__(self, dev, count, funcs, operation="Operation",
77 expected="NoMemory"):
78 self._dev = dev
79 self._count = count
80 self._funcs = funcs
81 self._operation = operation
82 self._expected = expected
83 def __enter__(self):
84 cmd = "TEST_ALLOC_FAIL %d:%s" % (self._count, self._funcs)
85 if "OK" not in self._dev.request(cmd):
86 raise HwsimSkip("TEST_ALLOC_FAIL not supported")
87 def __exit__(self, type, value, traceback):
88 if type is None:
89 raise Exception("%s succeeded during out-of-memory" % self._operation)
90 if type == dbus.exceptions.DBusException and self._expected in str(value):
91 return True
92 if self._dev.request("GET_ALLOC_FAIL") != "0:%s" % self._funcs:
93 raise Exception("%s did not trigger allocation failure" % self._operation)
94 return False
95
3a59cda1
JM
96def start_ap(ap, ssid="test-wps",
97 ap_uuid="27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"):
2ec82e67
JM
98 params = { "ssid": ssid, "eap_server": "1", "wps_state": "2",
99 "wpa_passphrase": "12345678", "wpa": "2",
100 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
3a59cda1 101 "ap_pin": "12345670", "uuid": ap_uuid}
afc26df2 102 return hostapd.add_ap(ap, params)
2ec82e67
JM
103
104def test_dbus_getall(dev, apdev):
105 """D-Bus GetAll"""
910eb5b5 106 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
107
108 props = wpas_obj.GetAll(WPAS_DBUS_SERVICE,
109 dbus_interface=dbus.PROPERTIES_IFACE)
110 logger.debug("GetAll(fi.w1.wpa.supplicant1, /fi/w1/wpa_supplicant1) ==> " + str(props))
111
112 props = if_obj.GetAll(WPAS_DBUS_IFACE,
113 dbus_interface=dbus.PROPERTIES_IFACE)
114 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_IFACE, path, str(props)))
115
116 props = if_obj.GetAll(WPAS_DBUS_IFACE_WPS,
117 dbus_interface=dbus.PROPERTIES_IFACE)
118 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_IFACE_WPS, path, str(props)))
119
120 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
121 dbus_interface=dbus.PROPERTIES_IFACE)
122 if len(res) != 0:
123 raise Exception("Unexpected BSSs entry: " + str(res))
124
125 res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
126 dbus_interface=dbus.PROPERTIES_IFACE)
127 if len(res) != 0:
128 raise Exception("Unexpected Networks entry: " + str(res))
129
8b8a1864 130 hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })
2ec82e67
JM
131 bssid = apdev[0]['bssid']
132 dev[0].scan_for_bss(bssid, freq=2412)
133 id = dev[0].add_network()
134 dev[0].set_network(id, "disabled", "0")
135 dev[0].set_network_quoted(id, "ssid", "test")
136
137 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
138 dbus_interface=dbus.PROPERTIES_IFACE)
139 if len(res) != 1:
140 raise Exception("Missing BSSs entry: " + str(res))
141 bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
142 props = bss_obj.GetAll(WPAS_DBUS_BSS, dbus_interface=dbus.PROPERTIES_IFACE)
143 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_BSS, res[0], str(props)))
144 bssid_str = ''
145 for item in props['BSSID']:
146 if len(bssid_str) > 0:
147 bssid_str += ':'
148 bssid_str += '%02x' % item
149 if bssid_str != bssid:
150 raise Exception("Unexpected BSSID in BSSs entry")
151
152 res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
153 dbus_interface=dbus.PROPERTIES_IFACE)
154 if len(res) != 1:
155 raise Exception("Missing Networks entry: " + str(res))
156 net_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
157 props = net_obj.GetAll(WPAS_DBUS_NETWORK,
158 dbus_interface=dbus.PROPERTIES_IFACE)
159 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_NETWORK, res[0], str(props)))
160 ssid = props['Properties']['ssid']
161 if ssid != '"test"':
162 raise Exception("Unexpected SSID in network entry")
163
73ece491
JM
164def test_dbus_getall_oom(dev, apdev):
165 """D-Bus GetAll wpa_config_get_all() OOM"""
166 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
167
168 id = dev[0].add_network()
169 dev[0].set_network(id, "disabled", "0")
170 dev[0].set_network_quoted(id, "ssid", "test")
171
172 res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
173 dbus_interface=dbus.PROPERTIES_IFACE)
174 if len(res) != 1:
175 raise Exception("Missing Networks entry: " + str(res))
176 net_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
177 for i in range(1, 50):
178 with alloc_fail(dev[0], i, "wpa_config_get_all"):
179 try:
180 props = net_obj.GetAll(WPAS_DBUS_NETWORK,
181 dbus_interface=dbus.PROPERTIES_IFACE)
182 except dbus.exceptions.DBusException, e:
183 pass
184
2ec82e67
JM
185def dbus_get(dbus, wpas_obj, prop, expect=None, byte_arrays=False):
186 val = wpas_obj.Get(WPAS_DBUS_SERVICE, prop,
187 dbus_interface=dbus.PROPERTIES_IFACE,
188 byte_arrays=byte_arrays)
189 if expect is not None and val != expect:
190 raise Exception("Unexpected %s: %s (expected: %s)" %
191 (prop, str(val), str(expect)))
192 return val
193
194def dbus_set(dbus, wpas_obj, prop, val):
195 wpas_obj.Set(WPAS_DBUS_SERVICE, prop, val,
196 dbus_interface=dbus.PROPERTIES_IFACE)
197
198def test_dbus_properties(dev, apdev):
199 """D-Bus Get/Set fi.w1.wpa_supplicant1 properties"""
910eb5b5 200 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
201
202 dbus_get(dbus, wpas_obj, "DebugLevel", expect="msgdump")
203 dbus_set(dbus, wpas_obj, "DebugLevel", "debug")
204 dbus_get(dbus, wpas_obj, "DebugLevel", expect="debug")
205 for (val,err) in [ (3, "Error.Failed: wrong property type"),
206 ("foo", "Error.Failed: wrong debug level value") ]:
207 try:
208 dbus_set(dbus, wpas_obj, "DebugLevel", val)
209 raise Exception("Invalid DebugLevel value accepted: " + str(val))
210 except dbus.exceptions.DBusException, e:
211 if err not in str(e):
212 raise Exception("Unexpected error message: " + str(e))
213 dbus_set(dbus, wpas_obj, "DebugLevel", "msgdump")
214 dbus_get(dbus, wpas_obj, "DebugLevel", expect="msgdump")
215
216 dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=True)
217 dbus_set(dbus, wpas_obj, "DebugTimestamp", False)
218 dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=False)
219 try:
220 dbus_set(dbus, wpas_obj, "DebugTimestamp", "foo")
221 raise Exception("Invalid DebugTimestamp value accepted")
222 except dbus.exceptions.DBusException, e:
223 if "Error.Failed: wrong property type" not in str(e):
224 raise Exception("Unexpected error message: " + str(e))
225 dbus_set(dbus, wpas_obj, "DebugTimestamp", True)
226 dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=True)
227
228 dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=True)
229 dbus_set(dbus, wpas_obj, "DebugShowKeys", False)
230 dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=False)
231 try:
232 dbus_set(dbus, wpas_obj, "DebugShowKeys", "foo")
233 raise Exception("Invalid DebugShowKeys value accepted")
234 except dbus.exceptions.DBusException, e:
235 if "Error.Failed: wrong property type" not in str(e):
236 raise Exception("Unexpected error message: " + str(e))
237 dbus_set(dbus, wpas_obj, "DebugShowKeys", True)
238 dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=True)
239
240 res = dbus_get(dbus, wpas_obj, "Interfaces")
241 if len(res) != 1:
242 raise Exception("Unexpected Interfaces value: " + str(res))
243
244 res = dbus_get(dbus, wpas_obj, "EapMethods")
245 if len(res) < 5 or "TTLS" not in res:
246 raise Exception("Unexpected EapMethods value: " + str(res))
247
248 res = dbus_get(dbus, wpas_obj, "Capabilities")
249 if len(res) < 2 or "p2p" not in res:
250 raise Exception("Unexpected Capabilities value: " + str(res))
251
252 dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True)
253 val = binascii.unhexlify("010006020304050608")
254 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(val))
255 res = dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True)
256 if val != res:
257 raise Exception("WFDIEs value changed")
258 try:
259 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray('\x00'))
260 raise Exception("Invalid WFDIEs value accepted")
261 except dbus.exceptions.DBusException, e:
262 if "InvalidArgs" not in str(e):
263 raise Exception("Unexpected error message: " + str(e))
264 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(''))
265 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(val))
266 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(''))
267 res = dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True)
268 if len(res) != 0:
269 raise Exception("WFDIEs not cleared properly")
270
271 res = dbus_get(dbus, wpas_obj, "EapMethods")
272 try:
273 dbus_set(dbus, wpas_obj, "EapMethods", res)
274 raise Exception("Invalid Set accepted")
275 except dbus.exceptions.DBusException, e:
276 if "InvalidArgs: Property is read-only" not in str(e):
277 raise Exception("Unexpected error message: " + str(e))
278
279 try:
280 wpas_obj.SetFoo(WPAS_DBUS_SERVICE, "DebugShowKeys", True,
281 dbus_interface=dbus.PROPERTIES_IFACE)
282 raise Exception("Unknown method accepted")
283 except dbus.exceptions.DBusException, e:
284 if "UnknownMethod" not in str(e):
285 raise Exception("Unexpected error message: " + str(e))
286
287 try:
288 wpas_obj.Get("foo", "DebugShowKeys",
289 dbus_interface=dbus.PROPERTIES_IFACE)
290 raise Exception("Invalid Get accepted")
291 except dbus.exceptions.DBusException, e:
292 if "InvalidArgs: No such property" not in str(e):
293 raise Exception("Unexpected error message: " + str(e))
294
295 test_obj = bus.get_object(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH,
296 introspect=False)
297 try:
298 test_obj.Get(123, "DebugShowKeys",
299 dbus_interface=dbus.PROPERTIES_IFACE)
300 raise Exception("Invalid Get accepted")
301 except dbus.exceptions.DBusException, e:
302 if "InvalidArgs: Invalid arguments" not in str(e):
303 raise Exception("Unexpected error message: " + str(e))
304 try:
305 test_obj.Get(WPAS_DBUS_SERVICE, 123,
306 dbus_interface=dbus.PROPERTIES_IFACE)
307 raise Exception("Invalid Get accepted")
308 except dbus.exceptions.DBusException, e:
309 if "InvalidArgs: Invalid arguments" not in str(e):
310 raise Exception("Unexpected error message: " + str(e))
311
312 try:
313 wpas_obj.Set(WPAS_DBUS_SERVICE, "WFDIEs",
314 dbus.ByteArray('', variant_level=2),
315 dbus_interface=dbus.PROPERTIES_IFACE)
316 raise Exception("Invalid Set accepted")
317 except dbus.exceptions.DBusException, e:
318 if "InvalidArgs: invalid message format" not in str(e):
319 raise Exception("Unexpected error message: " + str(e))
320
f0ef6889
DW
321def test_dbus_set_global_properties(dev, apdev):
322 """D-Bus Get/Set fi.w1.wpa_supplicant1 interface global properties"""
323 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
324
325 props = [ ('Okc', '0', '1'), ('ModelName', '', 'blahblahblah') ]
326
327 for p in props:
328 res = if_obj.Get(WPAS_DBUS_IFACE, p[0],
329 dbus_interface=dbus.PROPERTIES_IFACE)
330 if res != p[1]:
331 raise Exception("Unexpected " + p[0] + " value: " + str(res))
332
333 if_obj.Set(WPAS_DBUS_IFACE, p[0], p[2],
334 dbus_interface=dbus.PROPERTIES_IFACE)
335
336 res = if_obj.Get(WPAS_DBUS_IFACE, p[0],
337 dbus_interface=dbus.PROPERTIES_IFACE)
338 if res != p[2]:
339 raise Exception("Unexpected " + p[0] + " value after set: " + str(res))
340
2ec82e67
JM
341def test_dbus_invalid_method(dev, apdev):
342 """D-Bus invalid method"""
910eb5b5 343 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
344 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
345
346 try:
347 wps.Foo()
348 raise Exception("Unknown method accepted")
349 except dbus.exceptions.DBusException, e:
350 if "UnknownMethod" not in str(e):
351 raise Exception("Unexpected error message: " + str(e))
352
353 test_obj = bus.get_object(WPAS_DBUS_SERVICE, path, introspect=False)
354 test_wps = dbus.Interface(test_obj, WPAS_DBUS_IFACE_WPS)
355 try:
356 test_wps.Start(123)
357 raise Exception("WPS.Start with incorrect signature accepted")
358 except dbus.exceptions.DBusException, e:
359 if "InvalidArgs: Invalid arg" not in str(e):
360 raise Exception("Unexpected error message: " + str(e))
361
362def test_dbus_get_set_wps(dev, apdev):
363 """D-Bus Get/Set for WPS properties"""
364 try:
81e787b7 365 _test_dbus_get_set_wps(dev, apdev)
2ec82e67
JM
366 finally:
367 dev[0].request("SET wps_cred_processing 0")
364e28c9 368 dev[0].request("SET config_methods display keypad virtual_display nfc_interface p2ps")
2ec82e67
JM
369
370def _test_dbus_get_set_wps(dev, apdev):
910eb5b5 371 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
372
373 if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods",
374 dbus_interface=dbus.PROPERTIES_IFACE)
375
376 val = "display keypad virtual_display nfc_interface"
377 dev[0].request("SET config_methods " + val)
378
379 config = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods",
380 dbus_interface=dbus.PROPERTIES_IFACE)
381 if config != val:
382 raise Exception("Unexpected Get(ConfigMethods) result: " + config)
383
384 val2 = "push_button display"
385 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ConfigMethods", val2,
386 dbus_interface=dbus.PROPERTIES_IFACE)
387 config = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods",
388 dbus_interface=dbus.PROPERTIES_IFACE)
389 if config != val2:
390 raise Exception("Unexpected Get(ConfigMethods) result after Set: " + config)
391
392 dev[0].request("SET config_methods " + val)
393
394 for i in range(3):
395 dev[0].request("SET wps_cred_processing " + str(i))
396 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
397 dbus_interface=dbus.PROPERTIES_IFACE)
398 expected_val = False if i == 1 else True
399 if val != expected_val:
400 raise Exception("Unexpected Get(ProcessCredentials) result({}): {}".format(i, val))
401
402 class TestDbusGetSet(TestDbus):
403 def __init__(self, bus):
404 TestDbus.__init__(self, bus)
405 self.signal_received = False
406 self.signal_received_deprecated = False
407 self.sets_done = False
408
409 def __enter__(self):
410 gobject.timeout_add(1, self.run_sets)
411 gobject.timeout_add(1000, self.timeout)
412 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE_WPS,
413 "PropertiesChanged")
414 self.add_signal(self.propertiesChanged2, dbus.PROPERTIES_IFACE,
415 "PropertiesChanged")
416 self.loop.run()
417 return self
418
419 def propertiesChanged(self, properties):
420 logger.debug("PropertiesChanged: " + str(properties))
421 if properties.has_key("ProcessCredentials"):
422 self.signal_received_deprecated = True
423 if self.sets_done and self.signal_received:
424 self.loop.quit()
425
426 def propertiesChanged2(self, interface_name, changed_properties,
427 invalidated_properties):
428 logger.debug("propertiesChanged2: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
429 if interface_name != WPAS_DBUS_IFACE_WPS:
430 return
431 if changed_properties.has_key("ProcessCredentials"):
432 self.signal_received = True
433 if self.sets_done and self.signal_received_deprecated:
434 self.loop.quit()
435
436 def run_sets(self, *args):
437 logger.debug("run_sets")
438 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
439 dbus.Boolean(1),
440 dbus_interface=dbus.PROPERTIES_IFACE)
441 if if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
442 dbus_interface=dbus.PROPERTIES_IFACE) != True:
bc6e3288 443 raise Exception("Unexpected Get(ProcessCredentials) result after Set")
2ec82e67
JM
444 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
445 dbus.Boolean(0),
446 dbus_interface=dbus.PROPERTIES_IFACE)
447 if if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
448 dbus_interface=dbus.PROPERTIES_IFACE) != False:
bc6e3288 449 raise Exception("Unexpected Get(ProcessCredentials) result after Set")
2ec82e67
JM
450
451 self.dbus_sets_done = True
452 return False
453
454 def success(self):
455 return self.signal_received and self.signal_received_deprecated
456
457 with TestDbusGetSet(bus) as t:
458 if not t.success():
459 raise Exception("No signal received for ProcessCredentials change")
460
461def test_dbus_wps_invalid(dev, apdev):
462 """D-Bus invaldi WPS operation"""
910eb5b5 463 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
464 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
465
466 failures = [ {'Role': 'foo', 'Type': 'pbc'},
467 {'Role': 123, 'Type': 'pbc'},
468 {'Type': 'pbc'},
469 {'Role': 'enrollee'},
470 {'Role': 'registrar'},
471 {'Role': 'enrollee', 'Type': 123},
472 {'Role': 'enrollee', 'Type': 'foo'},
473 {'Role': 'enrollee', 'Type': 'pbc',
474 'Bssid': '02:33:44:55:66:77'},
475 {'Role': 'enrollee', 'Type': 'pin', 'Pin': 123},
476 {'Role': 'enrollee', 'Type': 'pbc',
477 'Bssid': dbus.ByteArray('12345')},
478 {'Role': 'enrollee', 'Type': 'pbc',
479 'P2PDeviceAddress': 12345},
480 {'Role': 'enrollee', 'Type': 'pbc',
481 'P2PDeviceAddress': dbus.ByteArray('12345')},
482 {'Role': 'enrollee', 'Type': 'pbc', 'Foo': 'bar'} ]
483 for args in failures:
484 try:
485 wps.Start(args)
486 raise Exception("Invalid WPS.Start() arguments accepted: " + str(args))
487 except dbus.exceptions.DBusException, e:
488 if not str(e).startswith("fi.w1.wpa_supplicant1.InvalidArgs"):
489 raise Exception("Unexpected error message: " + str(e))
490
0e126c6d
JM
491def test_dbus_wps_oom(dev, apdev):
492 """D-Bus WPS operation (OOM)"""
493 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
494 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
495
496 with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_getter_state", "Get"):
497 if_obj.Get(WPAS_DBUS_IFACE, "State",
498 dbus_interface=dbus.PROPERTIES_IFACE)
499
8b8a1864 500 hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })
0e126c6d
JM
501 bssid = apdev[0]['bssid']
502 dev[0].scan_for_bss(bssid, freq=2412)
503
1d32bc2c 504 time.sleep(0.05)
0e126c6d
JM
505 for i in range(1, 3):
506 with alloc_fail_dbus(dev[0], i, "=wpas_dbus_getter_bsss", "Get"):
507 if_obj.Get(WPAS_DBUS_IFACE, "BSSs",
508 dbus_interface=dbus.PROPERTIES_IFACE)
509
510 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
511 dbus_interface=dbus.PROPERTIES_IFACE)
512 bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
513 with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_getter_bss_rates", "Get"):
514 bss_obj.Get(WPAS_DBUS_BSS, "Rates",
515 dbus_interface=dbus.PROPERTIES_IFACE)
20c7d26f
JM
516 with alloc_fail(dev[0], 1,
517 "wpa_bss_get_bit_rates;wpas_dbus_getter_bss_rates"):
518 try:
519 bss_obj.Get(WPAS_DBUS_BSS, "Rates",
520 dbus_interface=dbus.PROPERTIES_IFACE)
521 except dbus.exceptions.DBusException, e:
522 pass
0e126c6d
JM
523
524 id = dev[0].add_network()
525 dev[0].set_network(id, "disabled", "0")
526 dev[0].set_network_quoted(id, "ssid", "test")
527
528 for i in range(1, 3):
529 with alloc_fail_dbus(dev[0], i, "=wpas_dbus_getter_networks", "Get"):
530 if_obj.Get(WPAS_DBUS_IFACE, "Networks",
531 dbus_interface=dbus.PROPERTIES_IFACE)
532
533 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_getter_interfaces", "Get"):
534 dbus_get(dbus, wpas_obj, "Interfaces")
535
536 for i in range(1, 6):
537 with alloc_fail_dbus(dev[0], i, "=eap_get_names_as_string_array;wpas_dbus_getter_eap_methods", "Get"):
538 dbus_get(dbus, wpas_obj, "EapMethods")
539
540 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_setter_config_methods", "Set",
541 expected="Error.Failed: Failed to set property"):
542 val2 = "push_button display"
543 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ConfigMethods", val2,
544 dbus_interface=dbus.PROPERTIES_IFACE)
545
546 with alloc_fail_dbus(dev[0], 1, "=wpa_config_add_network;wpas_dbus_handler_wps_start",
547 "WPS.Start",
548 expected="UnknownError: WPS start failed"):
549 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670'})
550
2ec82e67
JM
551def test_dbus_wps_pbc(dev, apdev):
552 """D-Bus WPS/PBC operation and signals"""
553 try:
81e787b7 554 _test_dbus_wps_pbc(dev, apdev)
2ec82e67
JM
555 finally:
556 dev[0].request("SET wps_cred_processing 0")
557
558def _test_dbus_wps_pbc(dev, apdev):
910eb5b5 559 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
560 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
561
562 hapd = start_ap(apdev[0])
563 hapd.request("WPS_PBC")
564 bssid = apdev[0]['bssid']
565 dev[0].scan_for_bss(bssid, freq="2412")
566 dev[0].request("SET wps_cred_processing 2")
567
1d0a917f
JM
568 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
569 dbus_interface=dbus.PROPERTIES_IFACE)
570 if len(res) != 1:
571 raise Exception("Missing BSSs entry: " + str(res))
572 bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
573 props = bss_obj.GetAll(WPAS_DBUS_BSS, dbus_interface=dbus.PROPERTIES_IFACE)
574 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_BSS, res[0], str(props)))
575 if 'WPS' not in props:
576 raise Exception("No WPS information in the BSS entry")
577 if 'Type' not in props['WPS']:
578 raise Exception("No Type field in the WPS dictionary")
579 if props['WPS']['Type'] != 'pbc':
580 raise Exception("Unexpected WPS Type: " + props['WPS']['Type'])
581
2ec82e67
JM
582 class TestDbusWps(TestDbus):
583 def __init__(self, bus, wps):
584 TestDbus.__init__(self, bus)
585 self.success_seen = False
586 self.credentials_received = False
587 self.wps = wps
588
589 def __enter__(self):
590 gobject.timeout_add(1, self.start_pbc)
591 gobject.timeout_add(15000, self.timeout)
592 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
593 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
594 "Credentials")
595 self.loop.run()
596 return self
597
598 def wpsEvent(self, name, args):
599 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
600 if name == "success":
601 self.success_seen = True
602 if self.credentials_received:
603 self.loop.quit()
604
605 def credentials(self, args):
606 logger.debug("credentials: " + str(args))
607 self.credentials_received = True
608 if self.success_seen:
609 self.loop.quit()
610
611 def start_pbc(self, *args):
612 logger.debug("start_pbc")
613 self.wps.Start({'Role': 'enrollee', 'Type': 'pbc'})
614 return False
615
616 def success(self):
617 return self.success_seen and self.credentials_received
618
619 with TestDbusWps(bus, wps) as t:
620 if not t.success():
621 raise Exception("Failure in D-Bus operations")
622
623 dev[0].wait_connected(timeout=10)
624 dev[0].request("DISCONNECT")
625 hapd.disable()
626 dev[0].flush_scan_cache()
627
3a59cda1
JM
628def test_dbus_wps_pbc_overlap(dev, apdev):
629 """D-Bus WPS/PBC operation and signal for PBC overlap"""
630 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
631 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
632
633 hapd = start_ap(apdev[0])
634 hapd2 = start_ap(apdev[1], ssid="test-wps2",
635 ap_uuid="27ea801a-9e5c-4e73-bd82-f89cbcd10d7f")
636 hapd.request("WPS_PBC")
637 hapd2.request("WPS_PBC")
638 bssid = apdev[0]['bssid']
639 dev[0].scan_for_bss(bssid, freq="2412")
640 bssid2 = apdev[1]['bssid']
641 dev[0].scan_for_bss(bssid2, freq="2412")
642
643 class TestDbusWps(TestDbus):
644 def __init__(self, bus, wps):
645 TestDbus.__init__(self, bus)
646 self.overlap_seen = False
647 self.wps = wps
648
649 def __enter__(self):
650 gobject.timeout_add(1, self.start_pbc)
651 gobject.timeout_add(15000, self.timeout)
652 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
653 self.loop.run()
654 return self
655
656 def wpsEvent(self, name, args):
657 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
658 if name == "pbc-overlap":
659 self.overlap_seen = True
660 self.loop.quit()
661
662 def start_pbc(self, *args):
663 logger.debug("start_pbc")
664 self.wps.Start({'Role': 'enrollee', 'Type': 'pbc'})
665 return False
666
667 def success(self):
668 return self.overlap_seen
669
670 with TestDbusWps(bus, wps) as t:
671 if not t.success():
672 raise Exception("Failure in D-Bus operations")
673
674 dev[0].request("WPS_CANCEL")
675 dev[0].request("DISCONNECT")
676 hapd.disable()
677 dev[0].flush_scan_cache()
678
2ec82e67
JM
679def test_dbus_wps_pin(dev, apdev):
680 """D-Bus WPS/PIN operation and signals"""
681 try:
81e787b7 682 _test_dbus_wps_pin(dev, apdev)
2ec82e67
JM
683 finally:
684 dev[0].request("SET wps_cred_processing 0")
685
686def _test_dbus_wps_pin(dev, apdev):
910eb5b5 687 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
688 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
689
690 hapd = start_ap(apdev[0])
691 hapd.request("WPS_PIN any 12345670")
692 bssid = apdev[0]['bssid']
693 dev[0].scan_for_bss(bssid, freq="2412")
694 dev[0].request("SET wps_cred_processing 2")
695
696 class TestDbusWps(TestDbus):
697 def __init__(self, bus):
698 TestDbus.__init__(self, bus)
699 self.success_seen = False
700 self.credentials_received = False
701
702 def __enter__(self):
703 gobject.timeout_add(1, self.start_pin)
704 gobject.timeout_add(15000, self.timeout)
705 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
706 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
707 "Credentials")
708 self.loop.run()
709 return self
710
711 def wpsEvent(self, name, args):
712 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
713 if name == "success":
714 self.success_seen = True
715 if self.credentials_received:
716 self.loop.quit()
717
718 def credentials(self, args):
719 logger.debug("credentials: " + str(args))
720 self.credentials_received = True
721 if self.success_seen:
722 self.loop.quit()
723
724 def start_pin(self, *args):
725 logger.debug("start_pin")
726 bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
727 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
728 'Bssid': bssid_ay})
729 return False
730
731 def success(self):
732 return self.success_seen and self.credentials_received
733
734 with TestDbusWps(bus) as t:
735 if not t.success():
736 raise Exception("Failure in D-Bus operations")
737
738 dev[0].wait_connected(timeout=10)
739
740def test_dbus_wps_pin2(dev, apdev):
741 """D-Bus WPS/PIN operation and signals (PIN from wpa_supplicant)"""
742 try:
81e787b7 743 _test_dbus_wps_pin2(dev, apdev)
2ec82e67
JM
744 finally:
745 dev[0].request("SET wps_cred_processing 0")
746
747def _test_dbus_wps_pin2(dev, apdev):
910eb5b5 748 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
749 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
750
751 hapd = start_ap(apdev[0])
752 bssid = apdev[0]['bssid']
753 dev[0].scan_for_bss(bssid, freq="2412")
754 dev[0].request("SET wps_cred_processing 2")
755
756 class TestDbusWps(TestDbus):
757 def __init__(self, bus):
758 TestDbus.__init__(self, bus)
759 self.success_seen = False
760 self.failed = False
761
762 def __enter__(self):
763 gobject.timeout_add(1, self.start_pin)
764 gobject.timeout_add(15000, self.timeout)
765 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
766 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
767 "Credentials")
768 self.loop.run()
769 return self
770
771 def wpsEvent(self, name, args):
772 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
773 if name == "success":
774 self.success_seen = True
775 if self.credentials_received:
776 self.loop.quit()
777
778 def credentials(self, args):
779 logger.debug("credentials: " + str(args))
780 self.credentials_received = True
781 if self.success_seen:
782 self.loop.quit()
783
784 def start_pin(self, *args):
785 logger.debug("start_pin")
786 bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
787 res = wps.Start({'Role': 'enrollee', 'Type': 'pin',
788 'Bssid': bssid_ay})
789 pin = res['Pin']
790 h = hostapd.Hostapd(apdev[0]['ifname'])
791 h.request("WPS_PIN any " + pin)
792 return False
793
794 def success(self):
795 return self.success_seen and self.credentials_received
796
797 with TestDbusWps(bus) as t:
798 if not t.success():
799 raise Exception("Failure in D-Bus operations")
800
801 dev[0].wait_connected(timeout=10)
802
803def test_dbus_wps_pin_m2d(dev, apdev):
804 """D-Bus WPS/PIN operation and signals with M2D"""
805 try:
81e787b7 806 _test_dbus_wps_pin_m2d(dev, apdev)
2ec82e67
JM
807 finally:
808 dev[0].request("SET wps_cred_processing 0")
809
810def _test_dbus_wps_pin_m2d(dev, apdev):
910eb5b5 811 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
812 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
813
814 hapd = start_ap(apdev[0])
815 bssid = apdev[0]['bssid']
816 dev[0].scan_for_bss(bssid, freq="2412")
817 dev[0].request("SET wps_cred_processing 2")
818
819 class TestDbusWps(TestDbus):
820 def __init__(self, bus):
821 TestDbus.__init__(self, bus)
822 self.success_seen = False
823 self.credentials_received = False
824
825 def __enter__(self):
826 gobject.timeout_add(1, self.start_pin)
827 gobject.timeout_add(15000, self.timeout)
828 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
829 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
830 "Credentials")
831 self.loop.run()
832 return self
833
834 def wpsEvent(self, name, args):
835 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
836 if name == "success":
837 self.success_seen = True
838 if self.credentials_received:
839 self.loop.quit()
840 elif name == "m2d":
841 h = hostapd.Hostapd(apdev[0]['ifname'])
842 h.request("WPS_PIN any 12345670")
843
844 def credentials(self, args):
845 logger.debug("credentials: " + str(args))
846 self.credentials_received = True
847 if self.success_seen:
848 self.loop.quit()
849
850 def start_pin(self, *args):
851 logger.debug("start_pin")
852 bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
853 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
854 'Bssid': bssid_ay})
855 return False
856
857 def success(self):
858 return self.success_seen and self.credentials_received
859
860 with TestDbusWps(bus) as t:
861 if not t.success():
862 raise Exception("Failure in D-Bus operations")
863
864 dev[0].wait_connected(timeout=10)
865
866def test_dbus_wps_reg(dev, apdev):
867 """D-Bus WPS/Registrar operation and signals"""
868 try:
81e787b7 869 _test_dbus_wps_reg(dev, apdev)
2ec82e67
JM
870 finally:
871 dev[0].request("SET wps_cred_processing 0")
872
873def _test_dbus_wps_reg(dev, apdev):
910eb5b5 874 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
875 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
876
877 hapd = start_ap(apdev[0])
878 hapd.request("WPS_PIN any 12345670")
879 bssid = apdev[0]['bssid']
880 dev[0].scan_for_bss(bssid, freq="2412")
881 dev[0].request("SET wps_cred_processing 2")
882
883 class TestDbusWps(TestDbus):
884 def __init__(self, bus):
885 TestDbus.__init__(self, bus)
886 self.credentials_received = False
887
888 def __enter__(self):
889 gobject.timeout_add(100, self.start_reg)
890 gobject.timeout_add(15000, self.timeout)
891 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
892 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
893 "Credentials")
894 self.loop.run()
895 return self
896
897 def wpsEvent(self, name, args):
898 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
899
900 def credentials(self, args):
901 logger.debug("credentials: " + str(args))
902 self.credentials_received = True
903 self.loop.quit()
904
905 def start_reg(self, *args):
906 logger.debug("start_reg")
907 bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
908 wps.Start({'Role': 'registrar', 'Type': 'pin',
909 'Pin': '12345670', 'Bssid': bssid_ay})
910 return False
911
912 def success(self):
913 return self.credentials_received
914
915 with TestDbusWps(bus) as t:
916 if not t.success():
917 raise Exception("Failure in D-Bus operations")
918
919 dev[0].wait_connected(timeout=10)
920
2a8e3f35
JM
921def test_dbus_wps_cancel(dev, apdev):
922 """D-Bus WPS Cancel operation"""
923 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
924 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
925
926 hapd = start_ap(apdev[0])
927 bssid = apdev[0]['bssid']
928
929 wps.Cancel()
930 dev[0].scan_for_bss(bssid, freq="2412")
931 bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
932 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
933 'Bssid': bssid_ay})
934 wps.Cancel()
935 dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 1)
936
2ec82e67
JM
937def test_dbus_scan_invalid(dev, apdev):
938 """D-Bus invalid scan method"""
910eb5b5 939 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
940 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
941
942 tests = [ ({}, "InvalidArgs"),
943 ({'Type': 123}, "InvalidArgs"),
944 ({'Type': 'foo'}, "InvalidArgs"),
945 ({'Type': 'active', 'Foo': 'bar'}, "InvalidArgs"),
946 ({'Type': 'active', 'SSIDs': 'foo'}, "InvalidArgs"),
947 ({'Type': 'active', 'SSIDs': ['foo']}, "InvalidArgs"),
948 ({'Type': 'active',
949 'SSIDs': [ dbus.ByteArray("1"), dbus.ByteArray("2"),
950 dbus.ByteArray("3"), dbus.ByteArray("4"),
951 dbus.ByteArray("5"), dbus.ByteArray("6"),
952 dbus.ByteArray("7"), dbus.ByteArray("8"),
953 dbus.ByteArray("9"), dbus.ByteArray("10"),
954 dbus.ByteArray("11"), dbus.ByteArray("12"),
955 dbus.ByteArray("13"), dbus.ByteArray("14"),
956 dbus.ByteArray("15"), dbus.ByteArray("16"),
957 dbus.ByteArray("17") ]},
958 "InvalidArgs"),
959 ({'Type': 'active',
960 'SSIDs': [ dbus.ByteArray("1234567890abcdef1234567890abcdef1") ]},
961 "InvalidArgs"),
962 ({'Type': 'active', 'IEs': 'foo'}, "InvalidArgs"),
963 ({'Type': 'active', 'IEs': ['foo']}, "InvalidArgs"),
964 ({'Type': 'active', 'Channels': 2412 }, "InvalidArgs"),
965 ({'Type': 'active', 'Channels': [ 2412 ] }, "InvalidArgs"),
966 ({'Type': 'active',
967 'Channels': [ (dbus.Int32(2412), dbus.UInt32(20)) ] },
968 "InvalidArgs"),
969 ({'Type': 'active',
970 'Channels': [ (dbus.UInt32(2412), dbus.Int32(20)) ] },
971 "InvalidArgs"),
972 ({'Type': 'active', 'AllowRoam': "yes" }, "InvalidArgs"),
973 ({'Type': 'passive', 'IEs': [ dbus.ByteArray("\xdd\x00") ]},
974 "InvalidArgs"),
975 ({'Type': 'passive', 'SSIDs': [ dbus.ByteArray("foo") ]},
976 "InvalidArgs")]
977 for (t,err) in tests:
978 try:
979 iface.Scan(t)
980 raise Exception("Invalid Scan() arguments accepted: " + str(t))
981 except dbus.exceptions.DBusException, e:
982 if err not in str(e):
983 raise Exception("Unexpected error message for invalid Scan(%s): %s" % (str(t), str(e)))
984
0e126c6d
JM
985def test_dbus_scan_oom(dev, apdev):
986 """D-Bus scan method and OOM"""
987 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
988 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
989
990 with alloc_fail_dbus(dev[0], 1,
991 "wpa_scan_clone_params;wpas_dbus_handler_scan",
992 "Scan", expected="ScanError: Scan request rejected"):
993 iface.Scan({ 'Type': 'passive',
994 'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
995
996 with alloc_fail_dbus(dev[0], 1,
997 "=wpas_dbus_get_scan_channels;wpas_dbus_handler_scan",
998 "Scan"):
999 iface.Scan({ 'Type': 'passive',
1000 'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
1001
1002 with alloc_fail_dbus(dev[0], 1,
1003 "=wpas_dbus_get_scan_ies;wpas_dbus_handler_scan",
1004 "Scan"):
1005 iface.Scan({ 'Type': 'active',
1006 'IEs': [ dbus.ByteArray("\xdd\x00") ],
1007 'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
1008
1009 with alloc_fail_dbus(dev[0], 1,
1010 "=wpas_dbus_get_scan_ssids;wpas_dbus_handler_scan",
1011 "Scan"):
1012 iface.Scan({ 'Type': 'active',
1013 'SSIDs': [ dbus.ByteArray("open"),
1014 dbus.ByteArray() ],
1015 'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
1016
2ec82e67
JM
1017def test_dbus_scan(dev, apdev):
1018 """D-Bus scan and related signals"""
910eb5b5 1019 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1020 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1021
8b8a1864 1022 hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })
2ec82e67
JM
1023
1024 class TestDbusScan(TestDbus):
1025 def __init__(self, bus):
1026 TestDbus.__init__(self, bus)
1027 self.scan_completed = 0
1028 self.bss_added = False
1d0a917f 1029 self.fail_reason = None
2ec82e67
JM
1030
1031 def __enter__(self):
1032 gobject.timeout_add(1, self.run_scan)
1033 gobject.timeout_add(15000, self.timeout)
1034 self.add_signal(self.scanDone, WPAS_DBUS_IFACE, "ScanDone")
1035 self.add_signal(self.bssAdded, WPAS_DBUS_IFACE, "BSSAdded")
1036 self.add_signal(self.bssRemoved, WPAS_DBUS_IFACE, "BSSRemoved")
1037 self.loop.run()
1038 return self
1039
1040 def scanDone(self, success):
1041 logger.debug("scanDone: success=%s" % success)
1042 self.scan_completed += 1
1043 if self.scan_completed == 1:
1044 iface.Scan({'Type': 'passive',
1045 'AllowRoam': True,
1046 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
1047 elif self.scan_completed == 2:
1048 iface.Scan({'Type': 'passive',
1049 'AllowRoam': False})
1050 elif self.bss_added and self.scan_completed == 3:
1051 self.loop.quit()
1052
1053 def bssAdded(self, bss, properties):
1054 logger.debug("bssAdded: %s" % bss)
1055 logger.debug(str(properties))
1d0a917f
JM
1056 if 'WPS' in properties:
1057 if 'Type' in properties['WPS']:
1058 self.fail_reason = "Unexpected WPS dictionary entry in non-WPS BSS"
1059 self.loop.quit()
2ec82e67
JM
1060 self.bss_added = True
1061 if self.scan_completed == 3:
1062 self.loop.quit()
1063
1064 def bssRemoved(self, bss):
1065 logger.debug("bssRemoved: %s" % bss)
1066
1067 def run_scan(self, *args):
1068 logger.debug("run_scan")
1069 iface.Scan({'Type': 'active',
1070 'SSIDs': [ dbus.ByteArray("open"),
1071 dbus.ByteArray() ],
1072 'IEs': [ dbus.ByteArray("\xdd\x00"),
1073 dbus.ByteArray() ],
1074 'AllowRoam': False,
1075 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
1076 return False
1077
1078 def success(self):
1079 return self.scan_completed == 3 and self.bss_added
1080
1081 with TestDbusScan(bus) as t:
1d0a917f
JM
1082 if t.fail_reason:
1083 raise Exception(t.fail_reason)
2ec82e67
JM
1084 if not t.success():
1085 raise Exception("Expected signals not seen")
1086
1087 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSs",
1088 dbus_interface=dbus.PROPERTIES_IFACE)
1089 if len(res) < 1:
1090 raise Exception("Scan result not in BSSs property")
1091 iface.FlushBSS(0)
1092 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSs",
1093 dbus_interface=dbus.PROPERTIES_IFACE)
1094 if len(res) != 0:
1095 raise Exception("FlushBSS() did not remove scan results from BSSs property")
1096 iface.FlushBSS(1)
1097
1098def test_dbus_scan_busy(dev, apdev):
1099 """D-Bus scan trigger rejection when busy with previous scan"""
910eb5b5 1100 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1101 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1102
1103 if "OK" not in dev[0].request("SCAN freq=2412-2462"):
1104 raise Exception("Failed to start scan")
1105 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-STARTED"], 15)
1106 if ev is None:
1107 raise Exception("Scan start timed out")
1108
1109 try:
1110 iface.Scan({'Type': 'active', 'AllowRoam': False})
1111 raise Exception("Scan() accepted when busy")
1112 except dbus.exceptions.DBusException, e:
1113 if "ScanError: Scan request reject" not in str(e):
1114 raise Exception("Unexpected error message: " + str(e))
1115
1116 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
1117 if ev is None:
1118 raise Exception("Scan timed out")
1119
1120def test_dbus_connect(dev, apdev):
1121 """D-Bus AddNetwork and connect"""
910eb5b5 1122 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1123 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1124
1125 ssid = "test-wpa2-psk"
1126 passphrase = 'qwertyuiop'
1127 params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
8b8a1864 1128 hapd = hostapd.add_ap(apdev[0], params)
2ec82e67
JM
1129
1130 class TestDbusConnect(TestDbus):
1131 def __init__(self, bus):
1132 TestDbus.__init__(self, bus)
1133 self.network_added = False
1134 self.network_selected = False
1135 self.network_removed = False
1136 self.state = 0
1137
1138 def __enter__(self):
1139 gobject.timeout_add(1, self.run_connect)
1140 gobject.timeout_add(15000, self.timeout)
1141 self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
1142 self.add_signal(self.networkRemoved, WPAS_DBUS_IFACE,
1143 "NetworkRemoved")
1144 self.add_signal(self.networkSelected, WPAS_DBUS_IFACE,
1145 "NetworkSelected")
1146 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1147 "PropertiesChanged")
1148 self.loop.run()
1149 return self
1150
1151 def networkAdded(self, network, properties):
1152 logger.debug("networkAdded: %s" % str(network))
1153 logger.debug(str(properties))
1154 self.network_added = True
1155
1156 def networkRemoved(self, network):
1157 logger.debug("networkRemoved: %s" % str(network))
1158 self.network_removed = True
1159
1160 def networkSelected(self, network):
1161 logger.debug("networkSelected: %s" % str(network))
1162 self.network_selected = True
1163
1164 def propertiesChanged(self, properties):
1165 logger.debug("propertiesChanged: %s" % str(properties))
1166 if 'State' in properties and properties['State'] == "completed":
1167 if self.state == 0:
1168 self.state = 1
1169 iface.Disconnect()
1170 elif self.state == 2:
1171 self.state = 3
1172 iface.Disconnect()
1173 elif self.state == 4:
1174 self.state = 5
1175 iface.Reattach()
1176 elif self.state == 5:
1177 self.state = 6
13b34972
JM
1178 iface.Disconnect()
1179 elif self.state == 7:
1180 self.state = 8
2ec82e67
JM
1181 res = iface.SignalPoll()
1182 logger.debug("SignalPoll: " + str(res))
1183 if 'frequency' not in res or res['frequency'] != 2412:
1184 self.state = -1
1185 logger.info("Unexpected SignalPoll result")
1186 iface.RemoveNetwork(self.netw)
1187 if 'State' in properties and properties['State'] == "disconnected":
1188 if self.state == 1:
1189 self.state = 2
1190 iface.SelectNetwork(self.netw)
1191 elif self.state == 3:
1192 self.state = 4
1193 iface.Reassociate()
1194 elif self.state == 6:
1195 self.state = 7
13b34972
JM
1196 iface.Reconnect()
1197 elif self.state == 8:
1198 self.state = 9
2ec82e67
JM
1199 self.loop.quit()
1200
1201 def run_connect(self, *args):
1202 logger.debug("run_connect")
1203 args = dbus.Dictionary({ 'ssid': ssid,
1204 'key_mgmt': 'WPA-PSK',
1205 'psk': passphrase,
1206 'scan_freq': 2412 },
1207 signature='sv')
1208 self.netw = iface.AddNetwork(args)
1209 iface.SelectNetwork(self.netw)
1210 return False
1211
1212 def success(self):
1213 if not self.network_added or \
1214 not self.network_removed or \
1215 not self.network_selected:
1216 return False
13b34972 1217 return self.state == 9
2ec82e67
JM
1218
1219 with TestDbusConnect(bus) as t:
1220 if not t.success():
1221 raise Exception("Expected signals not seen")
1222
53f4ed68
JM
1223def test_dbus_connect_psk_mem(dev, apdev):
1224 """D-Bus AddNetwork and connect with memory-only PSK"""
1225 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1226 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1227
1228 ssid = "test-wpa2-psk"
1229 passphrase = 'qwertyuiop'
1230 params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
8b8a1864 1231 hapd = hostapd.add_ap(apdev[0], params)
53f4ed68
JM
1232
1233 class TestDbusConnect(TestDbus):
1234 def __init__(self, bus):
1235 TestDbus.__init__(self, bus)
1236 self.connected = False
1237
1238 def __enter__(self):
1239 gobject.timeout_add(1, self.run_connect)
1240 gobject.timeout_add(15000, self.timeout)
1241 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1242 "PropertiesChanged")
1243 self.add_signal(self.networkRequest, WPAS_DBUS_IFACE,
1244 "NetworkRequest")
1245 self.loop.run()
1246 return self
1247
1248 def propertiesChanged(self, properties):
1249 logger.debug("propertiesChanged: %s" % str(properties))
1250 if 'State' in properties and properties['State'] == "completed":
1251 self.connected = True
1252 self.loop.quit()
1253
1254 def networkRequest(self, path, field, txt):
1255 logger.debug("networkRequest: %s %s %s" % (path, field, txt))
1256 if field == "PSK_PASSPHRASE":
1257 iface.NetworkReply(path, field, '"' + passphrase + '"')
1258
1259 def run_connect(self, *args):
1260 logger.debug("run_connect")
1261 args = dbus.Dictionary({ 'ssid': ssid,
1262 'key_mgmt': 'WPA-PSK',
1263 'mem_only_psk': 1,
1264 'scan_freq': 2412 },
1265 signature='sv')
1266 self.netw = iface.AddNetwork(args)
1267 iface.SelectNetwork(self.netw)
1268 return False
1269
1270 def success(self):
1271 return self.connected
1272
1273 with TestDbusConnect(bus) as t:
1274 if not t.success():
1275 raise Exception("Expected signals not seen")
1276
0e126c6d
JM
1277def test_dbus_connect_oom(dev, apdev):
1278 """D-Bus AddNetwork and connect when out-of-memory"""
1279 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1280 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1281
1282 if "OK" not in dev[0].request("TEST_ALLOC_FAIL 0:"):
1283 raise HwsimSkip("TEST_ALLOC_FAIL not supported in the build")
1284
1285 ssid = "test-wpa2-psk"
1286 passphrase = 'qwertyuiop'
1287 params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
8b8a1864 1288 hapd = hostapd.add_ap(apdev[0], params)
0e126c6d
JM
1289
1290 class TestDbusConnect(TestDbus):
1291 def __init__(self, bus):
1292 TestDbus.__init__(self, bus)
1293 self.network_added = False
1294 self.network_selected = False
1295 self.network_removed = False
1296 self.state = 0
1297
1298 def __enter__(self):
1299 gobject.timeout_add(1, self.run_connect)
1300 gobject.timeout_add(1500, self.timeout)
1301 self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
1302 self.add_signal(self.networkRemoved, WPAS_DBUS_IFACE,
1303 "NetworkRemoved")
1304 self.add_signal(self.networkSelected, WPAS_DBUS_IFACE,
1305 "NetworkSelected")
1306 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1307 "PropertiesChanged")
1308 self.loop.run()
1309 return self
1310
1311 def networkAdded(self, network, properties):
1312 logger.debug("networkAdded: %s" % str(network))
1313 logger.debug(str(properties))
1314 self.network_added = True
1315
1316 def networkRemoved(self, network):
1317 logger.debug("networkRemoved: %s" % str(network))
1318 self.network_removed = True
1319
1320 def networkSelected(self, network):
1321 logger.debug("networkSelected: %s" % str(network))
1322 self.network_selected = True
1323
1324 def propertiesChanged(self, properties):
1325 logger.debug("propertiesChanged: %s" % str(properties))
1326 if 'State' in properties and properties['State'] == "completed":
1327 if self.state == 0:
1328 self.state = 1
1329 iface.Disconnect()
1330 elif self.state == 2:
1331 self.state = 3
1332 iface.Disconnect()
1333 elif self.state == 4:
1334 self.state = 5
1335 iface.Reattach()
1336 elif self.state == 5:
1337 self.state = 6
1338 res = iface.SignalPoll()
1339 logger.debug("SignalPoll: " + str(res))
1340 if 'frequency' not in res or res['frequency'] != 2412:
1341 self.state = -1
1342 logger.info("Unexpected SignalPoll result")
1343 iface.RemoveNetwork(self.netw)
1344 if 'State' in properties and properties['State'] == "disconnected":
1345 if self.state == 1:
1346 self.state = 2
1347 iface.SelectNetwork(self.netw)
1348 elif self.state == 3:
1349 self.state = 4
1350 iface.Reassociate()
1351 elif self.state == 6:
1352 self.state = 7
1353 self.loop.quit()
1354
1355 def run_connect(self, *args):
1356 logger.debug("run_connect")
1357 args = dbus.Dictionary({ 'ssid': ssid,
1358 'key_mgmt': 'WPA-PSK',
1359 'psk': passphrase,
1360 'scan_freq': 2412 },
1361 signature='sv')
1362 try:
1363 self.netw = iface.AddNetwork(args)
1364 except Exception, e:
1365 logger.info("Exception on AddNetwork: " + str(e))
1366 self.loop.quit()
1367 return False
1368 try:
1369 iface.SelectNetwork(self.netw)
1370 except Exception, e:
1371 logger.info("Exception on SelectNetwork: " + str(e))
1372 self.loop.quit()
1373
1374 return False
1375
1376 def success(self):
1377 if not self.network_added or \
1378 not self.network_removed or \
1379 not self.network_selected:
1380 return False
1381 return self.state == 7
1382
1383 count = 0
1384 for i in range(1, 1000):
1385 for j in range(3):
1386 dev[j].dump_monitor()
1387 dev[0].request("TEST_ALLOC_FAIL %d:main" % i)
1388 try:
1389 with TestDbusConnect(bus) as t:
1390 if not t.success():
1391 logger.info("Iteration %d - Expected signals not seen" % i)
1392 else:
1393 logger.info("Iteration %d - success" % i)
1394
1395 state = dev[0].request('GET_ALLOC_FAIL')
1396 logger.info("GET_ALLOC_FAIL: " + state)
1397 dev[0].dump_monitor()
1398 dev[0].request("TEST_ALLOC_FAIL 0:")
1399 if i < 3:
1400 raise Exception("Connection succeeded during out-of-memory")
1401 if not state.startswith('0:'):
1402 count += 1
1403 if count == 5:
1404 break
1405 except:
1406 pass
2ec82e67 1407
89a79ca2
JM
1408 # Force regulatory update to re-fetch hw capabilities for the following
1409 # test cases.
1410 try:
1411 dev[0].dump_monitor()
1412 subprocess.call(['iw', 'reg', 'set', 'US'])
1413 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
1414 finally:
1415 dev[0].dump_monitor()
1416 subprocess.call(['iw', 'reg', 'set', '00'])
1417 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
1418
2ec82e67
JM
1419def test_dbus_while_not_connected(dev, apdev):
1420 """D-Bus invalid operations while not connected"""
910eb5b5 1421 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1422 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1423
1424 try:
1425 iface.Disconnect()
1426 raise Exception("Disconnect() accepted when not connected")
1427 except dbus.exceptions.DBusException, e:
1428 if "NotConnected" not in str(e):
1429 raise Exception("Unexpected error message for invalid Disconnect: " + str(e))
1430
1431 try:
1432 iface.Reattach()
1433 raise Exception("Reattach() accepted when not connected")
1434 except dbus.exceptions.DBusException, e:
1435 if "NotConnected" not in str(e):
1436 raise Exception("Unexpected error message for invalid Reattach: " + str(e))
1437
1438def test_dbus_connect_eap(dev, apdev):
1439 """D-Bus AddNetwork and connect to EAP network"""
089e7ca3 1440 check_altsubject_match_support(dev[0])
910eb5b5 1441 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1442 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1443
1444 ssid = "ieee8021x-open"
1445 params = hostapd.radius_params()
1446 params["ssid"] = ssid
1447 params["ieee8021x"] = "1"
8b8a1864 1448 hapd = hostapd.add_ap(apdev[0], params)
2ec82e67
JM
1449
1450 class TestDbusConnect(TestDbus):
1451 def __init__(self, bus):
1452 TestDbus.__init__(self, bus)
1453 self.certification_received = False
1454 self.eap_status = False
1455 self.state = 0
1456
1457 def __enter__(self):
1458 gobject.timeout_add(1, self.run_connect)
1459 gobject.timeout_add(15000, self.timeout)
1460 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1461 "PropertiesChanged")
1462 self.add_signal(self.certification, WPAS_DBUS_IFACE,
2099fed4 1463 "Certification", byte_arrays=True)
2ec82e67
JM
1464 self.add_signal(self.networkRequest, WPAS_DBUS_IFACE,
1465 "NetworkRequest")
1466 self.add_signal(self.eap, WPAS_DBUS_IFACE, "EAP")
1467 self.loop.run()
1468 return self
1469
1470 def propertiesChanged(self, properties):
1471 logger.debug("propertiesChanged: %s" % str(properties))
1472 if 'State' in properties and properties['State'] == "completed":
1473 if self.state == 0:
1474 self.state = 1
1475 iface.EAPLogoff()
2099fed4
JM
1476 logger.info("Set dNSName constraint")
1477 net_obj = bus.get_object(WPAS_DBUS_SERVICE, self.netw)
1478 args = dbus.Dictionary({ 'altsubject_match':
1479 self.server_dnsname },
1480 signature='sv')
1481 net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args,
1482 dbus_interface=dbus.PROPERTIES_IFACE)
2ec82e67
JM
1483 elif self.state == 2:
1484 self.state = 3
2099fed4
JM
1485 iface.Disconnect()
1486 logger.info("Set non-matching dNSName constraint")
1487 net_obj = bus.get_object(WPAS_DBUS_SERVICE, self.netw)
1488 args = dbus.Dictionary({ 'altsubject_match':
1489 self.server_dnsname + "FOO" },
1490 signature='sv')
1491 net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args,
1492 dbus_interface=dbus.PROPERTIES_IFACE)
2ec82e67
JM
1493 if 'State' in properties and properties['State'] == "disconnected":
1494 if self.state == 1:
1495 self.state = 2
1496 iface.EAPLogon()
1497 iface.SelectNetwork(self.netw)
2099fed4
JM
1498 if self.state == 3:
1499 self.state = 4
1500 iface.SelectNetwork(self.netw)
2ec82e67
JM
1501
1502 def certification(self, args):
1503 logger.debug("certification: %s" % str(args))
1504 self.certification_received = True
2099fed4
JM
1505 if args['depth'] == 0:
1506 # The test server certificate is supposed to have dNSName
1507 if len(args['altsubject']) < 1:
1508 raise Exception("Missing dNSName")
1509 dnsname = args['altsubject'][0]
1510 if not dnsname.startswith("DNS:"):
1511 raise Exception("Expected dNSName not found: " + dnsname)
1512 logger.info("altsubject: " + dnsname)
1513 self.server_dnsname = dnsname
2ec82e67
JM
1514
1515 def eap(self, status, parameter):
1516 logger.debug("EAP: status=%s parameter=%s" % (status, parameter))
1517 if status == 'completion' and parameter == 'success':
1518 self.eap_status = True
2099fed4
JM
1519 if self.state == 4 and status == 'remote certificate verification' and parameter == 'AltSubject mismatch':
1520 self.state = 5
1521 self.loop.quit()
2ec82e67
JM
1522
1523 def networkRequest(self, path, field, txt):
1524 logger.debug("networkRequest: %s %s %s" % (path, field, txt))
1525 if field == "PASSWORD":
1526 iface.NetworkReply(path, field, "password")
1527
1528 def run_connect(self, *args):
1529 logger.debug("run_connect")
1530 args = dbus.Dictionary({ 'ssid': ssid,
1531 'key_mgmt': 'IEEE8021X',
1532 'eapol_flags': 0,
1533 'eap': 'TTLS',
1534 'anonymous_identity': 'ttls',
1535 'identity': 'pap user',
1536 'ca_cert': 'auth_serv/ca.pem',
1537 'phase2': 'auth=PAP',
1538 'scan_freq': 2412 },
1539 signature='sv')
1540 self.netw = iface.AddNetwork(args)
1541 iface.SelectNetwork(self.netw)
1542 return False
1543
1544 def success(self):
1545 if not self.eap_status or not self.certification_received:
1546 return False
2099fed4 1547 return self.state == 5
2ec82e67
JM
1548
1549 with TestDbusConnect(bus) as t:
1550 if not t.success():
1551 raise Exception("Expected signals not seen")
1552
1553def test_dbus_network(dev, apdev):
1554 """D-Bus AddNetwork/RemoveNetwork parameters and error cases"""
910eb5b5 1555 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1556 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1557
1558 args = dbus.Dictionary({ 'ssid': "foo",
1559 'key_mgmt': 'WPA-PSK',
1560 'psk': "12345678",
1561 'identity': dbus.ByteArray([ 1, 2 ]),
1562 'priority': dbus.Int32(0),
1563 'scan_freq': dbus.UInt32(2412) },
1564 signature='sv')
1565 netw = iface.AddNetwork(args)
5a233fbd
JM
1566 id = int(dev[0].list_networks()[0]['id'])
1567 val = dev[0].get_network(id, "scan_freq")
1568 if val != "2412":
1569 raise Exception("Invalid scan_freq value: " + str(val))
1570 iface.RemoveNetwork(netw)
1571
1572 args = dbus.Dictionary({ 'ssid': "foo",
1573 'key_mgmt': 'NONE',
1574 'scan_freq': "2412 2432",
1575 'freq_list': "2412 2417 2432" },
1576 signature='sv')
1577 netw = iface.AddNetwork(args)
1578 id = int(dev[0].list_networks()[0]['id'])
1579 val = dev[0].get_network(id, "scan_freq")
1580 if val != "2412 2432":
1581 raise Exception("Invalid scan_freq value (2): " + str(val))
1582 val = dev[0].get_network(id, "freq_list")
1583 if val != "2412 2417 2432":
1584 raise Exception("Invalid freq_list value: " + str(val))
2ec82e67
JM
1585 iface.RemoveNetwork(netw)
1586 try:
1587 iface.RemoveNetwork(netw)
1588 raise Exception("Invalid RemoveNetwork() accepted")
1589 except dbus.exceptions.DBusException, e:
1590 if "NetworkUnknown" not in str(e):
1591 raise Exception("Unexpected error message for invalid RemoveNetwork: " + str(e))
1592 try:
1593 iface.SelectNetwork(netw)
1594 raise Exception("Invalid SelectNetwork() accepted")
1595 except dbus.exceptions.DBusException, e:
1596 if "NetworkUnknown" not in str(e):
1597 raise Exception("Unexpected error message for invalid RemoveNetwork: " + str(e))
1598
1599 args = dbus.Dictionary({ 'ssid': "foo1", 'key_mgmt': 'NONE',
1600 'identity': "testuser", 'scan_freq': '2412' },
1601 signature='sv')
1602 netw1 = iface.AddNetwork(args)
1603 args = dbus.Dictionary({ 'ssid': "foo2", 'key_mgmt': 'NONE' },
1604 signature='sv')
1605 netw2 = iface.AddNetwork(args)
1606 res = if_obj.Get(WPAS_DBUS_IFACE, "Networks",
1607 dbus_interface=dbus.PROPERTIES_IFACE)
1608 if len(res) != 2:
1609 raise Exception("Unexpected number of networks")
1610
1611 net_obj = bus.get_object(WPAS_DBUS_SERVICE, netw1)
1612 res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled",
1613 dbus_interface=dbus.PROPERTIES_IFACE)
1614 if res != False:
1615 raise Exception("Added network was unexpectedly enabled by default")
1616 net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.Boolean(True),
1617 dbus_interface=dbus.PROPERTIES_IFACE)
1618 res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled",
1619 dbus_interface=dbus.PROPERTIES_IFACE)
1620 if res != True:
1621 raise Exception("Set(Enabled,True) did not seem to change property value")
1622 net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.Boolean(False),
1623 dbus_interface=dbus.PROPERTIES_IFACE)
1624 res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled",
1625 dbus_interface=dbus.PROPERTIES_IFACE)
1626 if res != False:
1627 raise Exception("Set(Enabled,False) did not seem to change property value")
1628 try:
1629 net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.UInt32(1),
1630 dbus_interface=dbus.PROPERTIES_IFACE)
1631 raise Exception("Invalid Set(Enabled,1) accepted")
1632 except dbus.exceptions.DBusException, e:
1633 if "Error.Failed: wrong property type" not in str(e):
1634 raise Exception("Unexpected error message for invalid Set(Enabled,1): " + str(e))
1635
1636 args = dbus.Dictionary({ 'ssid': "foo1new" }, signature='sv')
1637 net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args,
1638 dbus_interface=dbus.PROPERTIES_IFACE)
1639 res = net_obj.Get(WPAS_DBUS_NETWORK, "Properties",
1640 dbus_interface=dbus.PROPERTIES_IFACE)
1641 if res['ssid'] != '"foo1new"':
1642 raise Exception("Set(Properties) failed to update ssid")
1643 if res['identity'] != '"testuser"':
1644 raise Exception("Set(Properties) unexpectedly changed unrelated parameter")
1645
1646 iface.RemoveAllNetworks()
1647 res = if_obj.Get(WPAS_DBUS_IFACE, "Networks",
1648 dbus_interface=dbus.PROPERTIES_IFACE)
1649 if len(res) != 0:
1650 raise Exception("Unexpected number of networks")
1651 iface.RemoveAllNetworks()
1652
1653 tests = [ dbus.Dictionary({ 'psk': "1234567" }, signature='sv'),
1654 dbus.Dictionary({ 'identity': dbus.ByteArray() },
1655 signature='sv'),
1656 dbus.Dictionary({ 'identity': dbus.Byte(1) }, signature='sv'),
1657 dbus.Dictionary({ 'identity': "" }, signature='sv') ]
1658 for args in tests:
1659 try:
1660 iface.AddNetwork(args)
1661 raise Exception("Invalid AddNetwork args accepted: " + str(args))
1662 except dbus.exceptions.DBusException, e:
1663 if "InvalidArgs" not in str(e):
1664 raise Exception("Unexpected error message for invalid AddNetwork: " + str(e))
1665
0e126c6d
JM
1666def test_dbus_network_oom(dev, apdev):
1667 """D-Bus AddNetwork/RemoveNetwork parameters and OOM error cases"""
1668 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1669 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1670
1671 args = dbus.Dictionary({ 'ssid': "foo1", 'key_mgmt': 'NONE',
1672 'identity': "testuser", 'scan_freq': '2412' },
1673 signature='sv')
1674 netw1 = iface.AddNetwork(args)
1675 net_obj = bus.get_object(WPAS_DBUS_SERVICE, netw1)
1676
1677 with alloc_fail_dbus(dev[0], 1,
1678 "wpa_config_get_all;wpas_dbus_getter_network_properties",
1679 "Get"):
1680 net_obj.Get(WPAS_DBUS_NETWORK, "Properties",
1681 dbus_interface=dbus.PROPERTIES_IFACE)
1682
1683 iface.RemoveAllNetworks()
1684
1685 with alloc_fail_dbus(dev[0], 1,
1686 "wpas_dbus_new_decompose_object_path;wpas_dbus_handler_remove_network",
1687 "RemoveNetwork", "InvalidArgs"):
1688 iface.RemoveNetwork(dbus.ObjectPath("/fi/w1/wpa_supplicant1/Interfaces/1234/Networks/1234"))
1689
1690 with alloc_fail(dev[0], 1, "wpa_dbus_register_object_per_iface;wpas_dbus_register_network"):
1691 args = dbus.Dictionary({ 'ssid': "foo2", 'key_mgmt': 'NONE' },
1692 signature='sv')
1693 try:
1694 netw = iface.AddNetwork(args)
1695 # Currently, AddNetwork() succeeds even if os_strdup() for path
1696 # fails, so remove the network if that occurs.
1697 iface.RemoveNetwork(netw)
1698 except dbus.exceptions.DBusException, e:
1699 pass
1700
1701 for i in range(1, 3):
1702 with alloc_fail(dev[0], i, "=wpas_dbus_register_network"):
1703 try:
1704 netw = iface.AddNetwork(args)
1705 # Currently, AddNetwork() succeeds even if network registration
1706 # fails, so remove the network if that occurs.
1707 iface.RemoveNetwork(netw)
1708 except dbus.exceptions.DBusException, e:
1709 pass
1710
1711 with alloc_fail_dbus(dev[0], 1,
1712 "=wpa_config_add_network;wpas_dbus_handler_add_network",
1713 "AddNetwork",
1714 "UnknownError: wpa_supplicant could not add a network"):
1715 args = dbus.Dictionary({ 'ssid': "foo2", 'key_mgmt': 'NONE' },
1716 signature='sv')
1717 netw = iface.AddNetwork(args)
1718
1719 tests = [ (1,
1720 'wpa_dbus_dict_get_entry;set_network_properties;wpas_dbus_handler_add_network',
1721 dbus.Dictionary({ 'ssid': dbus.ByteArray(' ') },
1722 signature='sv')),
1723 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1724 dbus.Dictionary({ 'ssid': 'foo' }, signature='sv')),
1725 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1726 dbus.Dictionary({ 'eap': 'foo' }, signature='sv')),
1727 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1728 dbus.Dictionary({ 'priority': dbus.UInt32(1) },
1729 signature='sv')),
1730 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1731 dbus.Dictionary({ 'priority': dbus.Int32(1) },
1732 signature='sv')),
1733 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1734 dbus.Dictionary({ 'ssid': dbus.ByteArray(' ') },
1735 signature='sv')) ]
1736 for (count,funcs,args) in tests:
1737 with alloc_fail_dbus(dev[0], count, funcs, "AddNetwork", "InvalidArgs"):
1738 netw = iface.AddNetwork(args)
1739
1740 if len(if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
1741 dbus_interface=dbus.PROPERTIES_IFACE)) > 0:
1742 raise Exception("Unexpected network block added")
1743 if len(dev[0].list_networks()) > 0:
1744 raise Exception("Unexpected network block visible")
1745
2ec82e67
JM
1746def test_dbus_interface(dev, apdev):
1747 """D-Bus CreateInterface/GetInterface/RemoveInterface parameters and error cases"""
7966674d
JM
1748 try:
1749 _test_dbus_interface(dev, apdev)
1750 finally:
1751 # Need to force P2P channel list update since the 'lo' interface
1752 # with driver=none ends up configuring default dualband channels.
1753 dev[0].request("SET country US")
1754 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
1755 if ev is None:
1756 ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"],
1757 timeout=1)
1758 dev[0].request("SET country 00")
1759 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
1760 if ev is None:
1761 ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"],
1762 timeout=1)
1763 subprocess.call(['iw', 'reg', 'set', '00'])
1764
1765def _test_dbus_interface(dev, apdev):
910eb5b5 1766 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1767 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
1768
1769 params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none' },
1770 signature='sv')
1771 path = wpas.CreateInterface(params)
1772 logger.debug("New interface path: " + str(path))
1773 path2 = wpas.GetInterface("lo")
1774 if path != path2:
1775 raise Exception("Interface object mismatch")
1776
1777 params = dbus.Dictionary({ 'Ifname': 'lo',
1778 'Driver': 'none',
1779 'ConfigFile': 'foo',
1780 'BridgeIfname': 'foo', },
1781 signature='sv')
1782 try:
1783 wpas.CreateInterface(params)
1784 raise Exception("Invalid CreateInterface() accepted")
1785 except dbus.exceptions.DBusException, e:
1786 if "InterfaceExists" not in str(e):
1787 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e))
1788
1789 wpas.RemoveInterface(path)
1790 try:
1791 wpas.RemoveInterface(path)
1792 raise Exception("Invalid RemoveInterface() accepted")
1793 except dbus.exceptions.DBusException, e:
1794 if "InterfaceUnknown" not in str(e):
1795 raise Exception("Unexpected error message for invalid RemoveInterface: " + str(e))
1796
1797 params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none',
1798 'Foo': 123 },
1799 signature='sv')
1800 try:
1801 wpas.CreateInterface(params)
1802 raise Exception("Invalid CreateInterface() accepted")
1803 except dbus.exceptions.DBusException, e:
1804 if "InvalidArgs" not in str(e):
1805 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e))
1806
1807 params = dbus.Dictionary({ 'Driver': 'none' }, signature='sv')
1808 try:
1809 wpas.CreateInterface(params)
1810 raise Exception("Invalid CreateInterface() accepted")
1811 except dbus.exceptions.DBusException, e:
1812 if "InvalidArgs" not in str(e):
1813 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e))
1814
1815 try:
1816 wpas.GetInterface("lo")
1817 raise Exception("Invalid GetInterface() accepted")
1818 except dbus.exceptions.DBusException, e:
1819 if "InterfaceUnknown" not in str(e):
1820 raise Exception("Unexpected error message for invalid RemoveInterface: " + str(e))
1821
0e126c6d
JM
1822def test_dbus_interface_oom(dev, apdev):
1823 """D-Bus CreateInterface/GetInterface/RemoveInterface OOM error cases"""
1824 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1825 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
1826
1827 with alloc_fail_dbus(dev[0], 1, "wpa_dbus_dict_get_entry;wpas_dbus_handler_create_interface", "CreateInterface", "InvalidArgs"):
1828 params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none' },
1829 signature='sv')
1830 wpas.CreateInterface(params)
1831
1832 for i in range(1, 1000):
1833 dev[0].request("TEST_ALLOC_FAIL %d:wpa_supplicant_add_iface;wpas_dbus_handler_create_interface" % i)
1834 params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none' },
1835 signature='sv')
1836 try:
1837 npath = wpas.CreateInterface(params)
1838 wpas.RemoveInterface(npath)
1839 logger.info("CreateInterface succeeds after %d allocation failures" % i)
1840 state = dev[0].request('GET_ALLOC_FAIL')
1841 logger.info("GET_ALLOC_FAIL: " + state)
1842 dev[0].dump_monitor()
1843 dev[0].request("TEST_ALLOC_FAIL 0:")
1844 if i < 5:
1845 raise Exception("CreateInterface succeeded during out-of-memory")
1846 if not state.startswith('0:'):
1847 break
1848 except dbus.exceptions.DBusException, e:
1849 pass
1850
1851 for arg in [ 'Driver', 'Ifname', 'ConfigFile', 'BridgeIfname' ]:
1852 with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_handler_create_interface",
1853 "CreateInterface"):
1854 params = dbus.Dictionary({ arg: 'foo' }, signature='sv')
1855 wpas.CreateInterface(params)
1856
2ec82e67
JM
1857def test_dbus_blob(dev, apdev):
1858 """D-Bus AddNetwork/RemoveNetwork parameters and error cases"""
910eb5b5 1859 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1860 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1861
1862 blob = dbus.ByteArray("\x01\x02\x03")
1863 iface.AddBlob('blob1', blob)
1864 try:
1865 iface.AddBlob('blob1', dbus.ByteArray("\x01\x02\x04"))
1866 raise Exception("Invalid AddBlob() accepted")
1867 except dbus.exceptions.DBusException, e:
1868 if "BlobExists" not in str(e):
1869 raise Exception("Unexpected error message for invalid AddBlob: " + str(e))
1870 res = iface.GetBlob('blob1')
1871 if len(res) != len(blob):
1872 raise Exception("Unexpected blob data length")
1873 for i in range(len(res)):
1874 if res[i] != dbus.Byte(blob[i]):
1875 raise Exception("Unexpected blob data")
1876 res = if_obj.Get(WPAS_DBUS_IFACE, "Blobs",
1877 dbus_interface=dbus.PROPERTIES_IFACE)
1878 if 'blob1' not in res:
1879 raise Exception("Added blob missing from Blobs property")
1880 iface.RemoveBlob('blob1')
1881 try:
1882 iface.RemoveBlob('blob1')
1883 raise Exception("Invalid RemoveBlob() accepted")
1884 except dbus.exceptions.DBusException, e:
1885 if "BlobUnknown" not in str(e):
1886 raise Exception("Unexpected error message for invalid RemoveBlob: " + str(e))
1887 try:
1888 iface.GetBlob('blob1')
1889 raise Exception("Invalid GetBlob() accepted")
1890 except dbus.exceptions.DBusException, e:
1891 if "BlobUnknown" not in str(e):
1892 raise Exception("Unexpected error message for invalid GetBlob: " + str(e))
1893
1894 class TestDbusBlob(TestDbus):
1895 def __init__(self, bus):
1896 TestDbus.__init__(self, bus)
1897 self.blob_added = False
1898 self.blob_removed = False
1899
1900 def __enter__(self):
1901 gobject.timeout_add(1, self.run_blob)
1902 gobject.timeout_add(15000, self.timeout)
1903 self.add_signal(self.blobAdded, WPAS_DBUS_IFACE, "BlobAdded")
1904 self.add_signal(self.blobRemoved, WPAS_DBUS_IFACE, "BlobRemoved")
1905 self.loop.run()
1906 return self
1907
1908 def blobAdded(self, blobName):
1909 logger.debug("blobAdded: %s" % blobName)
1910 if blobName == 'blob2':
1911 self.blob_added = True
1912
1913 def blobRemoved(self, blobName):
1914 logger.debug("blobRemoved: %s" % blobName)
1915 if blobName == 'blob2':
1916 self.blob_removed = True
1917 self.loop.quit()
1918
1919 def run_blob(self, *args):
1920 logger.debug("run_blob")
1921 iface.AddBlob('blob2', dbus.ByteArray("\x01\x02\x04"))
1922 iface.RemoveBlob('blob2')
1923 return False
1924
1925 def success(self):
1926 return self.blob_added and self.blob_removed
1927
1928 with TestDbusBlob(bus) as t:
1929 if not t.success():
1930 raise Exception("Expected signals not seen")
1931
0e126c6d
JM
1932def test_dbus_blob_oom(dev, apdev):
1933 """D-Bus AddNetwork/RemoveNetwork OOM error cases"""
1934 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1935 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1936
1937 for i in range(1, 4):
1938 with alloc_fail_dbus(dev[0], i, "wpas_dbus_handler_add_blob",
1939 "AddBlob"):
1940 iface.AddBlob('blob_no_mem', dbus.ByteArray("\x01\x02\x03\x04"))
1941
2ec82e67
JM
1942def test_dbus_autoscan(dev, apdev):
1943 """D-Bus Autoscan()"""
910eb5b5 1944 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1945 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1946
1947 iface.AutoScan("foo")
1948 iface.AutoScan("periodic:1")
1949 iface.AutoScan("")
1950 dev[0].request("AUTOSCAN ")
1951
0e126c6d
JM
1952def test_dbus_autoscan_oom(dev, apdev):
1953 """D-Bus Autoscan() OOM"""
1954 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1955 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1956
1957 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_handler_autoscan", "AutoScan"):
1958 iface.AutoScan("foo")
1959 dev[0].request("AUTOSCAN ")
1960
2ec82e67
JM
1961def test_dbus_tdls_invalid(dev, apdev):
1962 """D-Bus invalid TDLS operations"""
910eb5b5 1963 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1964 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1965
8b8a1864 1966 hapd = hostapd.add_ap(apdev[0], { "ssid": "test-open" })
2ec82e67
JM
1967 connect_2sta_open(dev, hapd)
1968 addr1 = dev[1].p2p_interface_addr()
1969
1970 try:
1971 iface.TDLSDiscover("foo")
1972 raise Exception("Invalid TDLSDiscover() accepted")
1973 except dbus.exceptions.DBusException, e:
1974 if "InvalidArgs" not in str(e):
1975 raise Exception("Unexpected error message for invalid TDLSDiscover: " + str(e))
1976
1977 try:
1978 iface.TDLSStatus("foo")
1979 raise Exception("Invalid TDLSStatus() accepted")
1980 except dbus.exceptions.DBusException, e:
1981 if "InvalidArgs" not in str(e):
1982 raise Exception("Unexpected error message for invalid TDLSStatus: " + str(e))
1983
1984 res = iface.TDLSStatus(addr1)
1985 if res != "peer does not exist":
1986 raise Exception("Unexpected TDLSStatus response")
1987
1988 try:
1989 iface.TDLSSetup("foo")
1990 raise Exception("Invalid TDLSSetup() accepted")
1991 except dbus.exceptions.DBusException, e:
1992 if "InvalidArgs" not in str(e):
1993 raise Exception("Unexpected error message for invalid TDLSSetup: " + str(e))
1994
1995 try:
1996 iface.TDLSTeardown("foo")
1997 raise Exception("Invalid TDLSTeardown() accepted")
1998 except dbus.exceptions.DBusException, e:
1999 if "InvalidArgs" not in str(e):
2000 raise Exception("Unexpected error message for invalid TDLSTeardown: " + str(e))
2001
795b6f57
JM
2002 try:
2003 iface.TDLSTeardown("00:11:22:33:44:55")
2004 raise Exception("TDLSTeardown accepted for unknown peer")
2005 except dbus.exceptions.DBusException, e:
2006 if "UnknownError: error performing TDLS teardown" not in str(e):
2007 raise Exception("Unexpected error message: " + str(e))
2008
0e126c6d
JM
2009def test_dbus_tdls_oom(dev, apdev):
2010 """D-Bus TDLS operations during OOM"""
2011 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2012 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2013
2014 with alloc_fail_dbus(dev[0], 1, "wpa_tdls_add_peer", "TDLSSetup",
2015 "UnknownError: error performing TDLS setup"):
2016 iface.TDLSSetup("00:11:22:33:44:55")
2017
2ec82e67
JM
2018def test_dbus_tdls(dev, apdev):
2019 """D-Bus TDLS"""
910eb5b5 2020 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2021 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2022
8b8a1864 2023 hapd = hostapd.add_ap(apdev[0], { "ssid": "test-open" })
2ec82e67
JM
2024 connect_2sta_open(dev, hapd)
2025
2026 addr1 = dev[1].p2p_interface_addr()
2027
2028 class TestDbusTdls(TestDbus):
2029 def __init__(self, bus):
2030 TestDbus.__init__(self, bus)
2031 self.tdls_setup = False
2032 self.tdls_teardown = False
2033
2034 def __enter__(self):
2035 gobject.timeout_add(1, self.run_tdls)
2036 gobject.timeout_add(15000, self.timeout)
2037 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
2038 "PropertiesChanged")
2039 self.loop.run()
2040 return self
2041
2042 def propertiesChanged(self, properties):
2043 logger.debug("propertiesChanged: %s" % str(properties))
2044
2045 def run_tdls(self, *args):
2046 logger.debug("run_tdls")
2047 iface.TDLSDiscover(addr1)
2048 gobject.timeout_add(100, self.run_tdls2)
2049 return False
2050
2051 def run_tdls2(self, *args):
2052 logger.debug("run_tdls2")
2053 iface.TDLSSetup(addr1)
2054 gobject.timeout_add(500, self.run_tdls3)
2055 return False
2056
2057 def run_tdls3(self, *args):
2058 logger.debug("run_tdls3")
2059 res = iface.TDLSStatus(addr1)
2060 if res == "connected":
2061 self.tdls_setup = True
2062 else:
2063 logger.info("Unexpected TDLSStatus: " + res)
2064 iface.TDLSTeardown(addr1)
2065 gobject.timeout_add(200, self.run_tdls4)
2066 return False
2067
2068 def run_tdls4(self, *args):
2069 logger.debug("run_tdls4")
2070 res = iface.TDLSStatus(addr1)
2071 if res == "peer does not exist":
2072 self.tdls_teardown = True
2073 else:
2074 logger.info("Unexpected TDLSStatus: " + res)
2075 self.loop.quit()
2076 return False
2077
2078 def success(self):
2079 return self.tdls_setup and self.tdls_teardown
2080
2081 with TestDbusTdls(bus) as t:
2082 if not t.success():
2083 raise Exception("Expected signals not seen")
2084
2085def test_dbus_pkcs11(dev, apdev):
2086 """D-Bus SetPKCS11EngineAndModulePath()"""
910eb5b5 2087 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2088 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2089
2090 try:
2091 iface.SetPKCS11EngineAndModulePath("foo", "bar")
2092 except dbus.exceptions.DBusException, e:
2093 if "Error.Failed: Reinit of the EAPOL" not in str(e):
2094 raise Exception("Unexpected error message for invalid SetPKCS11EngineAndModulePath: " + str(e))
2095
2096 try:
2097 iface.SetPKCS11EngineAndModulePath("foo", "")
2098 except dbus.exceptions.DBusException, e:
2099 if "Error.Failed: Reinit of the EAPOL" not in str(e):
2100 raise Exception("Unexpected error message for invalid SetPKCS11EngineAndModulePath: " + str(e))
2101
2102 iface.SetPKCS11EngineAndModulePath("", "bar")
2103 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11EnginePath",
2104 dbus_interface=dbus.PROPERTIES_IFACE)
2105 if res != "":
2106 raise Exception("Unexpected PKCS11EnginePath value: " + res)
2107 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11ModulePath",
2108 dbus_interface=dbus.PROPERTIES_IFACE)
2109 if res != "bar":
2110 raise Exception("Unexpected PKCS11ModulePath value: " + res)
2111
2112 iface.SetPKCS11EngineAndModulePath("", "")
2113 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11EnginePath",
2114 dbus_interface=dbus.PROPERTIES_IFACE)
2115 if res != "":
2116 raise Exception("Unexpected PKCS11EnginePath value: " + res)
2117 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11ModulePath",
2118 dbus_interface=dbus.PROPERTIES_IFACE)
2119 if res != "":
2120 raise Exception("Unexpected PKCS11ModulePath value: " + res)
2121
2122def test_dbus_apscan(dev, apdev):
2123 """D-Bus Get/Set ApScan"""
2124 try:
81e787b7 2125 _test_dbus_apscan(dev, apdev)
2ec82e67
JM
2126 finally:
2127 dev[0].request("AP_SCAN 1")
2128
2129def _test_dbus_apscan(dev, apdev):
910eb5b5 2130 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2131
2132 res = if_obj.Get(WPAS_DBUS_IFACE, "ApScan",
2133 dbus_interface=dbus.PROPERTIES_IFACE)
2134 if res != 1:
2135 raise Exception("Unexpected initial ApScan value: %d" % res)
2136
2137 for i in range(3):
2138 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(i),
2139 dbus_interface=dbus.PROPERTIES_IFACE)
2140 res = if_obj.Get(WPAS_DBUS_IFACE, "ApScan",
2141 dbus_interface=dbus.PROPERTIES_IFACE)
2142 if res != i:
2143 raise Exception("Unexpected ApScan value %d (expected %d)" % (res, i))
2144
2145 try:
2146 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.Int16(-1),
2147 dbus_interface=dbus.PROPERTIES_IFACE)
2148 raise Exception("Invalid Set(ApScan,-1) accepted")
2149 except dbus.exceptions.DBusException, e:
2150 if "Error.Failed: wrong property type" not in str(e):
2151 raise Exception("Unexpected error message for invalid Set(ApScan,-1): " + str(e))
2152
2153 try:
2154 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(123),
2155 dbus_interface=dbus.PROPERTIES_IFACE)
2156 raise Exception("Invalid Set(ApScan,123) accepted")
2157 except dbus.exceptions.DBusException, e:
2158 if "Error.Failed: ap_scan must be 0, 1, or 2" not in str(e):
2159 raise Exception("Unexpected error message for invalid Set(ApScan,123): " + str(e))
2160
2161 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(1),
2162 dbus_interface=dbus.PROPERTIES_IFACE)
2163
2164def test_dbus_fastreauth(dev, apdev):
2165 """D-Bus Get/Set FastReauth"""
910eb5b5 2166 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2167
2168 res = if_obj.Get(WPAS_DBUS_IFACE, "FastReauth",
2169 dbus_interface=dbus.PROPERTIES_IFACE)
2170 if res != True:
2171 raise Exception("Unexpected initial FastReauth value: " + str(res))
2172
2173 for i in [ False, True ]:
2174 if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Boolean(i),
2175 dbus_interface=dbus.PROPERTIES_IFACE)
2176 res = if_obj.Get(WPAS_DBUS_IFACE, "FastReauth",
2177 dbus_interface=dbus.PROPERTIES_IFACE)
2178 if res != i:
2179 raise Exception("Unexpected FastReauth value %d (expected %d)" % (res, i))
2180
2181 try:
2182 if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Int16(-1),
2183 dbus_interface=dbus.PROPERTIES_IFACE)
2184 raise Exception("Invalid Set(FastReauth,-1) accepted")
2185 except dbus.exceptions.DBusException, e:
2186 if "Error.Failed: wrong property type" not in str(e):
2187 raise Exception("Unexpected error message for invalid Set(ApScan,-1): " + str(e))
2188
2189 if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Boolean(True),
2190 dbus_interface=dbus.PROPERTIES_IFACE)
2191
2192def test_dbus_bss_expire(dev, apdev):
2193 """D-Bus Get/Set BSSExpireAge and BSSExpireCount"""
910eb5b5 2194 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2195
2196 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(179),
2197 dbus_interface=dbus.PROPERTIES_IFACE)
2198 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSExpireAge",
2199 dbus_interface=dbus.PROPERTIES_IFACE)
2200 if res != 179:
2201 raise Exception("Unexpected BSSExpireAge value %d (expected %d)" % (res, i))
2202
2203 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(3),
2204 dbus_interface=dbus.PROPERTIES_IFACE)
2205 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSExpireCount",
2206 dbus_interface=dbus.PROPERTIES_IFACE)
2207 if res != 3:
2208 raise Exception("Unexpected BSSExpireCount value %d (expected %d)" % (res, i))
2209
2210 try:
2211 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.Int16(-1),
2212 dbus_interface=dbus.PROPERTIES_IFACE)
2213 raise Exception("Invalid Set(BSSExpireAge,-1) accepted")
2214 except dbus.exceptions.DBusException, e:
2215 if "Error.Failed: wrong property type" not in str(e):
2216 raise Exception("Unexpected error message for invalid Set(BSSExpireAge,-1): " + str(e))
2217
2218 try:
2219 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(9),
2220 dbus_interface=dbus.PROPERTIES_IFACE)
2221 raise Exception("Invalid Set(BSSExpireAge,9) accepted")
2222 except dbus.exceptions.DBusException, e:
2223 if "Error.Failed: BSSExpireAge must be >= 10" not in str(e):
2224 raise Exception("Unexpected error message for invalid Set(BSSExpireAge,9): " + str(e))
2225
2226 try:
2227 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.Int16(-1),
2228 dbus_interface=dbus.PROPERTIES_IFACE)
2229 raise Exception("Invalid Set(BSSExpireCount,-1) accepted")
2230 except dbus.exceptions.DBusException, e:
2231 if "Error.Failed: wrong property type" not in str(e):
2232 raise Exception("Unexpected error message for invalid Set(BSSExpireCount,-1): " + str(e))
2233
2234 try:
2235 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(0),
2236 dbus_interface=dbus.PROPERTIES_IFACE)
2237 raise Exception("Invalid Set(BSSExpireCount,0) accepted")
2238 except dbus.exceptions.DBusException, e:
2239 if "Error.Failed: BSSExpireCount must be > 0" not in str(e):
2240 raise Exception("Unexpected error message for invalid Set(BSSExpireCount,0): " + str(e))
2241
2242 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(180),
2243 dbus_interface=dbus.PROPERTIES_IFACE)
2244 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(2),
2245 dbus_interface=dbus.PROPERTIES_IFACE)
2246
2247def test_dbus_country(dev, apdev):
2248 """D-Bus Get/Set Country"""
2249 try:
81e787b7 2250 _test_dbus_country(dev, apdev)
2ec82e67
JM
2251 finally:
2252 dev[0].request("SET country 00")
2253 subprocess.call(['iw', 'reg', 'set', '00'])
2254
2255def _test_dbus_country(dev, apdev):
910eb5b5 2256 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2257
2258 # work around issues with possible pending regdom event from the end of
2259 # the previous test case
2260 time.sleep(0.2)
2261 dev[0].dump_monitor()
2262
2263 if_obj.Set(WPAS_DBUS_IFACE, "Country", "FI",
2264 dbus_interface=dbus.PROPERTIES_IFACE)
2265 res = if_obj.Get(WPAS_DBUS_IFACE, "Country",
2266 dbus_interface=dbus.PROPERTIES_IFACE)
2267 if res != "FI":
2268 raise Exception("Unexpected Country value %s (expected FI)" % res)
2269
2270 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"])
2271 if ev is None:
cb346b49
JM
2272 # For now, work around separate P2P Device interface event delivery
2273 ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
2274 if ev is None:
2275 raise Exception("regdom change event not seen")
2ec82e67
JM
2276 if "init=USER type=COUNTRY alpha2=FI" not in ev:
2277 raise Exception("Unexpected event contents: " + ev)
2278
2279 try:
2280 if_obj.Set(WPAS_DBUS_IFACE, "Country", dbus.Int16(-1),
2281 dbus_interface=dbus.PROPERTIES_IFACE)
2282 raise Exception("Invalid Set(Country,-1) accepted")
2283 except dbus.exceptions.DBusException, e:
2284 if "Error.Failed: wrong property type" not in str(e):
2285 raise Exception("Unexpected error message for invalid Set(Country,-1): " + str(e))
2286
2287 try:
2288 if_obj.Set(WPAS_DBUS_IFACE, "Country", "F",
2289 dbus_interface=dbus.PROPERTIES_IFACE)
2290 raise Exception("Invalid Set(Country,F) accepted")
2291 except dbus.exceptions.DBusException, e:
2292 if "Error.Failed: invalid country code" not in str(e):
2293 raise Exception("Unexpected error message for invalid Set(Country,F): " + str(e))
2294
2295 if_obj.Set(WPAS_DBUS_IFACE, "Country", "00",
2296 dbus_interface=dbus.PROPERTIES_IFACE)
2297
2298 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"])
2299 if ev is None:
cb346b49
JM
2300 # For now, work around separate P2P Device interface event delivery
2301 ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
2302 if ev is None:
2303 raise Exception("regdom change event not seen")
be90370b
JM
2304 # init=CORE was previously used due to invalid db.txt data for 00. For
2305 # now, allow both it and the new init=USER after fixed db.txt.
2306 if "init=CORE type=WORLD" not in ev and "init=USER type=WORLD" not in ev:
2ec82e67
JM
2307 raise Exception("Unexpected event contents: " + ev)
2308
2309def test_dbus_scan_interval(dev, apdev):
2310 """D-Bus Get/Set ScanInterval"""
2311 try:
2312 _test_dbus_scan_interval(dev, apdev)
2313 finally:
2314 dev[0].request("SCAN_INTERVAL 5")
2315
2316def _test_dbus_scan_interval(dev, apdev):
910eb5b5 2317 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2318
2319 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(3),
2320 dbus_interface=dbus.PROPERTIES_IFACE)
2321 res = if_obj.Get(WPAS_DBUS_IFACE, "ScanInterval",
2322 dbus_interface=dbus.PROPERTIES_IFACE)
2323 if res != 3:
2324 raise Exception("Unexpected ScanInterval value %d (expected %d)" % (res, i))
2325
2326 try:
2327 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.UInt16(100),
2328 dbus_interface=dbus.PROPERTIES_IFACE)
2329 raise Exception("Invalid Set(ScanInterval,100) accepted")
2330 except dbus.exceptions.DBusException, e:
2331 if "Error.Failed: wrong property type" not in str(e):
2332 raise Exception("Unexpected error message for invalid Set(ScanInterval,100): " + str(e))
2333
2334 try:
2335 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(-1),
2336 dbus_interface=dbus.PROPERTIES_IFACE)
2337 raise Exception("Invalid Set(ScanInterval,-1) accepted")
2338 except dbus.exceptions.DBusException, e:
2339 if "Error.Failed: scan_interval must be >= 0" not in str(e):
2340 raise Exception("Unexpected error message for invalid Set(ScanInterval,-1): " + str(e))
2341
2342 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(5),
2343 dbus_interface=dbus.PROPERTIES_IFACE)
2344
2345def test_dbus_probe_req_reporting(dev, apdev):
2346 """D-Bus Probe Request reporting"""
910eb5b5 2347 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67 2348
2ec82e67
JM
2349 dev[1].p2p_find(social=True)
2350
2351 class TestDbusProbe(TestDbus):
2352 def __init__(self, bus):
2353 TestDbus.__init__(self, bus)
2354 self.reported = False
2355
2356 def __enter__(self):
2357 gobject.timeout_add(1, self.run_test)
2358 gobject.timeout_add(15000, self.timeout)
cb346b49
JM
2359 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
2360 "GroupStarted")
2ec82e67
JM
2361 self.add_signal(self.probeRequest, WPAS_DBUS_IFACE, "ProbeRequest",
2362 byte_arrays=True)
2ec82e67
JM
2363 self.loop.run()
2364 return self
2365
cb346b49
JM
2366 def groupStarted(self, properties):
2367 logger.debug("groupStarted: " + str(properties))
2368 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
2369 properties['interface_object'])
2370 self.iface = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE)
2371 self.iface.SubscribeProbeReq()
2372 self.group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2373
2ec82e67
JM
2374 def probeRequest(self, args):
2375 logger.debug("probeRequest: args=%s" % str(args))
2376 self.reported = True
2377 self.loop.quit()
2378
2379 def run_test(self, *args):
2380 logger.debug("run_test")
cb346b49
JM
2381 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2382 params = dbus.Dictionary({ 'frequency': 2412 })
2383 p2p.GroupAdd(params)
2ec82e67
JM
2384 return False
2385
2386 def success(self):
2387 return self.reported
2388
2389 with TestDbusProbe(bus) as t:
2390 if not t.success():
2391 raise Exception("Expected signals not seen")
cb346b49
JM
2392 t.iface.UnsubscribeProbeReq()
2393 try:
2394 t.iface.UnsubscribeProbeReq()
2395 raise Exception("Invalid UnsubscribeProbeReq() accepted")
2396 except dbus.exceptions.DBusException, e:
2397 if "NoSubscription" not in str(e):
2398 raise Exception("Unexpected error message for invalid UnsubscribeProbeReq(): " + str(e))
2399 t.group_p2p.Disconnect()
2ec82e67
JM
2400
2401 with TestDbusProbe(bus) as t:
2402 if not t.success():
2403 raise Exception("Expected signals not seen")
2404 # On purpose, leave ProbeReq subscription in place to test automatic
2405 # cleanup.
2406
2407 dev[1].p2p_stop_find()
2ec82e67 2408
0e126c6d
JM
2409def test_dbus_probe_req_reporting_oom(dev, apdev):
2410 """D-Bus Probe Request reporting (OOM)"""
2411 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2412 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2413
fb9adae4
JM
2414 # Need to make sure this process has not already subscribed to avoid false
2415 # failures due to the operation succeeding due to os_strdup() not even
2416 # getting called.
2417 try:
2418 iface.UnsubscribeProbeReq()
2419 was_subscribed = True
2420 except dbus.exceptions.DBusException, e:
2421 was_subscribed = False
2422 pass
2423
0e126c6d
JM
2424 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_handler_subscribe_preq",
2425 "SubscribeProbeReq"):
2426 iface.SubscribeProbeReq()
2427
fb9adae4
JM
2428 if was_subscribed:
2429 # On purpose, leave ProbeReq subscription in place to test automatic
2430 # cleanup.
2431 iface.SubscribeProbeReq()
2432
2ec82e67
JM
2433def test_dbus_p2p_invalid(dev, apdev):
2434 """D-Bus invalid P2P operations"""
910eb5b5 2435 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2436 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2437
2438 try:
2439 p2p.RejectPeer(path + "/Peers/00112233445566")
2440 raise Exception("Invalid RejectPeer accepted")
2441 except dbus.exceptions.DBusException, e:
2442 if "UnknownError: Failed to call wpas_p2p_reject" not in str(e):
2443 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e))
2444
2445 try:
2446 p2p.RejectPeer("/foo")
2447 raise Exception("Invalid RejectPeer accepted")
2448 except dbus.exceptions.DBusException, e:
2449 if "InvalidArgs" not in str(e):
2450 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e))
2451
001c4bf5
JM
2452 tests = [ { },
2453 { 'peer': 'foo' },
2454 { 'foo': "bar" },
2455 { 'iface': "abc" },
2456 { 'iface': 123 } ]
2457 for t in tests:
2458 try:
2459 p2p.RemoveClient(t)
2460 raise Exception("Invalid RemoveClient accepted")
2461 except dbus.exceptions.DBusException, e:
2462 if "InvalidArgs" not in str(e):
2463 raise Exception("Unexpected error message for invalid RemoveClient(): " + str(e))
2464
2ec82e67
JM
2465 tests = [ {'DiscoveryType': 'foo'},
2466 {'RequestedDeviceTypes': 'foo'},
2467 {'RequestedDeviceTypes': ['foo']},
2468 {'RequestedDeviceTypes': ['1','2','3','4','5','6','7','8','9',
2469 '10','11','12','13','14','15','16',
2470 '17']},
2471 {'RequestedDeviceTypes': dbus.Array([], signature="s")},
2472 {'RequestedDeviceTypes': dbus.Array([['foo']], signature="as")},
2473 {'RequestedDeviceTypes': dbus.Array([], signature="i")},
2474 {'RequestedDeviceTypes': [dbus.ByteArray('12345678'),
2475 dbus.ByteArray('1234567')]},
2476 {'Foo': dbus.Int16(1)},
2477 {'Foo': dbus.UInt16(1)},
2478 {'Foo': dbus.Int64(1)},
2479 {'Foo': dbus.UInt64(1)},
2480 {'Foo': dbus.Double(1.23)},
2481 {'Foo': dbus.Signature('s')},
2482 {'Foo': 'bar'}]
2483 for t in tests:
2484 try:
2485 p2p.Find(dbus.Dictionary(t))
2486 raise Exception("Invalid Find accepted")
2487 except dbus.exceptions.DBusException, e:
2488 if "InvalidArgs" not in str(e):
2489 raise Exception("Unexpected error message for invalid Find(): " + str(e))
2490
2491 for p in [ "/foo",
2492 "/fi/w1/wpa_supplicant1/Interfaces/1234",
2493 "/fi/w1/wpa_supplicant1/Interfaces/1234/Networks/1234" ]:
2494 try:
2495 p2p.RemovePersistentGroup(dbus.ObjectPath(p))
2496 raise Exception("Invalid RemovePersistentGroup accepted")
2497 except dbus.exceptions.DBusException, e:
2498 if "InvalidArgs" not in str(e):
2499 raise Exception("Unexpected error message for invalid RemovePersistentGroup: " + str(e))
2500
2501 try:
2502 dev[0].request("P2P_SET disabled 1")
2503 p2p.Listen(5)
2504 raise Exception("Invalid Listen accepted")
2505 except dbus.exceptions.DBusException, e:
2506 if "UnknownError: Could not start P2P listen" not in str(e):
2507 raise Exception("Unexpected error message for invalid Listen: " + str(e))
2508 finally:
2509 dev[0].request("P2P_SET disabled 0")
2510
2511 test_obj = bus.get_object(WPAS_DBUS_SERVICE, path, introspect=False)
2512 test_p2p = dbus.Interface(test_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2513 try:
2514 test_p2p.Listen("foo")
2515 raise Exception("Invalid Listen accepted")
2516 except dbus.exceptions.DBusException, e:
2517 if "InvalidArgs" not in str(e):
2518 raise Exception("Unexpected error message for invalid Listen: " + str(e))
2519
2520 try:
2521 dev[0].request("P2P_SET disabled 1")
2522 p2p.ExtendedListen(dbus.Dictionary({}))
2523 raise Exception("Invalid ExtendedListen accepted")
2524 except dbus.exceptions.DBusException, e:
2525 if "UnknownError: failed to initiate a p2p_ext_listen" not in str(e):
2526 raise Exception("Unexpected error message for invalid ExtendedListen: " + str(e))
2527 finally:
2528 dev[0].request("P2P_SET disabled 0")
2529
2530 try:
2531 dev[0].request("P2P_SET disabled 1")
2532 args = { 'duration1': 30000, 'interval1': 102400,
2533 'duration2': 20000, 'interval2': 102400 }
2534 p2p.PresenceRequest(args)
2535 raise Exception("Invalid PresenceRequest accepted")
2536 except dbus.exceptions.DBusException, e:
2537 if "UnknownError: Failed to invoke presence request" not in str(e):
2538 raise Exception("Unexpected error message for invalid PresenceRequest: " + str(e))
2539 finally:
2540 dev[0].request("P2P_SET disabled 0")
2541
2542 try:
2543 params = dbus.Dictionary({'frequency': dbus.Int32(-1)})
2544 p2p.GroupAdd(params)
2545 raise Exception("Invalid GroupAdd accepted")
2546 except dbus.exceptions.DBusException, e:
2547 if "InvalidArgs" not in str(e):
2548 raise Exception("Unexpected error message for invalid GroupAdd: " + str(e))
2549
2550 try:
2551 params = dbus.Dictionary({'persistent_group_object':
2552 dbus.ObjectPath(path),
2553 'frequency': 2412})
2554 p2p.GroupAdd(params)
2555 raise Exception("Invalid GroupAdd accepted")
2556 except dbus.exceptions.DBusException, e:
2557 if "InvalidArgs" not in str(e):
2558 raise Exception("Unexpected error message for invalid GroupAdd: " + str(e))
2559
2560 try:
2561 p2p.Disconnect()
2562 raise Exception("Invalid Disconnect accepted")
2563 except dbus.exceptions.DBusException, e:
2564 if "UnknownError: failed to disconnect" not in str(e):
2565 raise Exception("Unexpected error message for invalid Disconnect: " + str(e))
2566
2567 try:
2568 dev[0].request("P2P_SET disabled 1")
2569 p2p.Flush()
2570 raise Exception("Invalid Flush accepted")
2571 except dbus.exceptions.DBusException, e:
2572 if "Error.Failed: P2P is not available for this interface" not in str(e):
2573 raise Exception("Unexpected error message for invalid Flush: " + str(e))
2574 finally:
2575 dev[0].request("P2P_SET disabled 0")
2576
2577 try:
2578 dev[0].request("P2P_SET disabled 1")
2579 args = { 'peer': path,
2580 'join': True,
2581 'wps_method': 'pbc',
2582 'frequency': 2412 }
2583 pin = p2p.Connect(args)
2584 raise Exception("Invalid Connect accepted")
2585 except dbus.exceptions.DBusException, e:
2586 if "Error.Failed: P2P is not available for this interface" not in str(e):
2587 raise Exception("Unexpected error message for invalid Connect: " + str(e))
2588 finally:
2589 dev[0].request("P2P_SET disabled 0")
2590
2591 tests = [ { 'frequency': dbus.Int32(-1) },
2592 { 'wps_method': 'pbc' },
2593 { 'wps_method': 'foo' } ]
2594 for args in tests:
2595 try:
2596 pin = p2p.Connect(args)
2597 raise Exception("Invalid Connect accepted")
2598 except dbus.exceptions.DBusException, e:
2599 if "InvalidArgs" not in str(e):
2600 raise Exception("Unexpected error message for invalid Connect: " + str(e))
2601
2602 try:
2603 dev[0].request("P2P_SET disabled 1")
2604 args = { 'peer': path }
2605 pin = p2p.Invite(args)
2606 raise Exception("Invalid Invite accepted")
2607 except dbus.exceptions.DBusException, e:
2608 if "Error.Failed: P2P is not available for this interface" not in str(e):
2609 raise Exception("Unexpected error message for invalid Invite: " + str(e))
2610 finally:
2611 dev[0].request("P2P_SET disabled 0")
2612
2613 try:
2614 args = { 'foo': 'bar' }
2615 pin = p2p.Invite(args)
2616 raise Exception("Invalid Invite accepted")
2617 except dbus.exceptions.DBusException, e:
2618 if "InvalidArgs" not in str(e):
2619 raise Exception("Unexpected error message for invalid Connect: " + str(e))
2620
2621 tests = [ (path, 'display', "InvalidArgs"),
2622 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2623 'display',
2624 "UnknownError: Failed to send provision discovery request"),
2625 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2626 'keypad',
2627 "UnknownError: Failed to send provision discovery request"),
2628 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2629 'pbc',
2630 "UnknownError: Failed to send provision discovery request"),
2631 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2632 'pushbutton',
2633 "UnknownError: Failed to send provision discovery request"),
2634 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2635 'foo', "InvalidArgs") ]
2636 for (p,method,err) in tests:
2637 try:
2638 p2p.ProvisionDiscoveryRequest(p, method)
2639 raise Exception("Invalid ProvisionDiscoveryRequest accepted")
2640 except dbus.exceptions.DBusException, e:
2641 if err not in str(e):
2642 raise Exception("Unexpected error message for invalid ProvisionDiscoveryRequest: " + str(e))
2643
2644 try:
2645 dev[0].request("P2P_SET disabled 1")
2646 if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Peers",
2647 dbus_interface=dbus.PROPERTIES_IFACE)
2648 raise Exception("Invalid Get(Peers) accepted")
2649 except dbus.exceptions.DBusException, e:
2650 if "Error.Failed: P2P is not available for this interface" not in str(e):
2651 raise Exception("Unexpected error message for invalid Get(Peers): " + str(e))
2652 finally:
2653 dev[0].request("P2P_SET disabled 0")
2654
0e126c6d
JM
2655def test_dbus_p2p_oom(dev, apdev):
2656 """D-Bus P2P operations and OOM"""
2657 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2658 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2659
2660 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_string_array",
2661 "Find", "InvalidArgs"):
2662 p2p.Find(dbus.Dictionary({ 'Foo': [ 'bar' ] }))
2663
2664 with alloc_fail_dbus(dev[0], 2, "_wpa_dbus_dict_entry_get_string_array",
2665 "Find", "InvalidArgs"):
2666 p2p.Find(dbus.Dictionary({ 'Foo': [ 'bar' ] }))
2667
2668 with alloc_fail_dbus(dev[0], 10, "_wpa_dbus_dict_entry_get_string_array",
2669 "Find", "InvalidArgs"):
2670 p2p.Find(dbus.Dictionary({ 'Foo': [ '1','2','3','4','5','6','7','8','9' ] }))
2671
2672 with alloc_fail_dbus(dev[0], 1, ":=_wpa_dbus_dict_entry_get_binarray",
2673 "Find", "InvalidArgs"):
2674 p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123') ] }))
2675
2676 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_byte_array;_wpa_dbus_dict_entry_get_binarray",
2677 "Find", "InvalidArgs"):
2678 p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123') ] }))
2679
2680 with alloc_fail_dbus(dev[0], 2, "=_wpa_dbus_dict_entry_get_binarray",
2681 "Find", "InvalidArgs"):
2682 p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123'),
2683 dbus.ByteArray('123'),
2684 dbus.ByteArray('123'),
2685 dbus.ByteArray('123'),
2686 dbus.ByteArray('123'),
2687 dbus.ByteArray('123'),
2688 dbus.ByteArray('123'),
2689 dbus.ByteArray('123'),
2690 dbus.ByteArray('123'),
2691 dbus.ByteArray('123'),
2692 dbus.ByteArray('123') ] }))
2693
2694 with alloc_fail_dbus(dev[0], 1, "wpabuf_alloc_ext_data;_wpa_dbus_dict_entry_get_binarray",
2695 "Find", "InvalidArgs"):
2696 p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123') ] }))
2697
2698 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_fill_value_from_variant;wpas_dbus_handler_p2p_find",
2699 "Find", "InvalidArgs"):
2700 p2p.Find(dbus.Dictionary({ 'Foo': path }))
2701
2702 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_byte_array",
2703 "AddService", "InvalidArgs"):
2704 args = { 'service_type': 'bonjour',
2705 'response': dbus.ByteArray(500*'b') }
2706 p2p.AddService(args)
2707
2708 with alloc_fail_dbus(dev[0], 2, "_wpa_dbus_dict_entry_get_byte_array",
2709 "AddService", "InvalidArgs"):
2710 p2p.AddService(args)
2711
2ec82e67
JM
2712def test_dbus_p2p_discovery(dev, apdev):
2713 """D-Bus P2P discovery"""
910eb5b5 2714 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2715 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2716
2717 addr0 = dev[0].p2p_dev_addr()
2718
2719 dev[1].request("SET sec_device_type 1-0050F204-2")
2720 dev[1].request("VENDOR_ELEM_ADD 1 dd0c0050f2041049000411223344")
2721 dev[1].p2p_listen()
2722 addr1 = dev[1].p2p_dev_addr()
2723 a1 = binascii.unhexlify(addr1.replace(':',''))
2724
2725 wfd_devinfo = "00001c440028"
2726 dev[2].request("SET wifi_display 1")
2727 dev[2].request("WFD_SUBELEM_SET 0 0006" + wfd_devinfo)
2728 wfd = binascii.unhexlify('000006' + wfd_devinfo)
2729 dev[2].p2p_listen()
2730 addr2 = dev[2].p2p_dev_addr()
2731 a2 = binascii.unhexlify(addr2.replace(':',''))
2732
2733 res = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE,
2734 dbus_interface=dbus.PROPERTIES_IFACE)
2735 if 'Peers' not in res:
2736 raise Exception("GetAll result missing Peers")
2737 if len(res['Peers']) != 0:
2738 raise Exception("Unexpected peer(s) in the list")
2739
2740 args = {'DiscoveryType': 'social',
2741 'RequestedDeviceTypes': [dbus.ByteArray('12345678')],
2742 'Timeout': dbus.Int32(1) }
2743 p2p.Find(dbus.Dictionary(args))
2744 p2p.StopFind()
2745
2746 class TestDbusP2p(TestDbus):
2747 def __init__(self, bus):
2748 TestDbus.__init__(self, bus)
2749 self.found = False
2750 self.found2 = False
e7d454bb 2751 self.found_prop = False
2ec82e67 2752 self.lost = False
571a1af2 2753 self.find_stopped = False
2ec82e67
JM
2754
2755 def __enter__(self):
2756 gobject.timeout_add(1, self.run_test)
2757 gobject.timeout_add(15000, self.timeout)
2758 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
2759 "DeviceFound")
e7d454bb
JM
2760 self.add_signal(self.deviceFoundProperties,
2761 WPAS_DBUS_IFACE_P2PDEVICE, "DeviceFoundProperties")
2ec82e67
JM
2762 self.add_signal(self.deviceLost, WPAS_DBUS_IFACE_P2PDEVICE,
2763 "DeviceLost")
2764 self.add_signal(self.provisionDiscoveryResponseEnterPin,
2765 WPAS_DBUS_IFACE_P2PDEVICE,
2766 "ProvisionDiscoveryResponseEnterPin")
571a1af2
JM
2767 self.add_signal(self.findStopped, WPAS_DBUS_IFACE_P2PDEVICE,
2768 "FindStopped")
2ec82e67
JM
2769 self.loop.run()
2770 return self
2771
2772 def deviceFound(self, path):
2773 logger.debug("deviceFound: path=%s" % path)
2774 res = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Peers",
2775 dbus_interface=dbus.PROPERTIES_IFACE)
2776 if len(res) < 1:
2777 raise Exception("Unexpected number of peers")
2778 if path not in res:
2779 raise Exception("Mismatch in peer object path")
2780 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
2781 res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
2782 dbus_interface=dbus.PROPERTIES_IFACE,
2783 byte_arrays=True)
2784 logger.debug("peer properties: " + str(res))
2785
2786 if res['DeviceAddress'] == a1:
2787 if 'SecondaryDeviceTypes' not in res:
2788 raise Exception("Missing SecondaryDeviceTypes")
2789 sec = res['SecondaryDeviceTypes']
2790 if len(sec) < 1:
2791 raise Exception("Secondary device type missing")
2792 if "\x00\x01\x00\x50\xF2\x04\x00\x02" not in sec:
2793 raise Exception("Secondary device type mismatch")
2794
2795 if 'VendorExtension' not in res:
2796 raise Exception("Missing VendorExtension")
2797 vendor = res['VendorExtension']
2798 if len(vendor) < 1:
2799 raise Exception("Vendor extension missing")
2800 if "\x11\x22\x33\x44" not in vendor:
2801 raise Exception("Secondary device type mismatch")
2802
2803 self.found = True
2804 elif res['DeviceAddress'] == a2:
2805 if 'IEs' not in res:
2806 raise Exception("IEs missing")
2807 if res['IEs'] != wfd:
2808 raise Exception("IEs mismatch")
2809 self.found2 = True
2810 else:
2811 raise Exception("Unexpected peer device address")
2812
2813 if self.found and self.found2:
2814 p2p.StopFind()
2815 p2p.RejectPeer(path)
2816 p2p.ProvisionDiscoveryRequest(path, 'display')
2817
2818 def deviceLost(self, path):
2819 logger.debug("deviceLost: path=%s" % path)
2820 self.lost = True
2821 try:
2822 p2p.RejectPeer(path)
2823 raise Exception("Invalid RejectPeer accepted")
2824 except dbus.exceptions.DBusException, e:
2825 if "UnknownError: Failed to call wpas_p2p_reject" not in str(e):
2826 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e))
2827 self.loop.quit()
2828
e7d454bb
JM
2829 def deviceFoundProperties(self, path, properties):
2830 logger.debug("deviceFoundProperties: path=%s" % path)
2831 logger.debug("peer properties: " + str(properties))
2832 if properties['DeviceAddress'] == a1:
2833 self.found_prop = True
2834
2ec82e67
JM
2835 def provisionDiscoveryResponseEnterPin(self, peer_object):
2836 logger.debug("provisionDiscoveryResponseEnterPin - peer=%s" % peer_object)
2837 p2p.Flush()
2838
571a1af2
JM
2839 def findStopped(self):
2840 logger.debug("findStopped")
2841 self.find_stopped = True
2842
2ec82e67
JM
2843 def run_test(self, *args):
2844 logger.debug("run_test")
2845 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social',
2846 'Timeout': dbus.Int32(10)}))
2847 return False
2848
2849 def success(self):
571a1af2 2850 return self.found and self.lost and self.found2 and self.find_stopped
2ec82e67
JM
2851
2852 with TestDbusP2p(bus) as t:
2853 if not t.success():
2854 raise Exception("Expected signals not seen")
2855
2856 dev[1].request("VENDOR_ELEM_REMOVE 1 *")
2857 dev[1].p2p_stop_find()
2858
2859 p2p.Listen(1)
2860 dev[2].p2p_stop_find()
2861 dev[2].request("P2P_FLUSH")
2862 if not dev[2].discover_peer(addr0):
2863 raise Exception("Peer not found")
2864 p2p.StopFind()
2865 dev[2].p2p_stop_find()
2866
2867 try:
2868 p2p.ExtendedListen(dbus.Dictionary({'foo': 100}))
2869 raise Exception("Invalid ExtendedListen accepted")
2870 except dbus.exceptions.DBusException, e:
2871 if "InvalidArgs" not in str(e):
2872 raise Exception("Unexpected error message for invalid ExtendedListen(): " + str(e))
2873
2874 p2p.ExtendedListen(dbus.Dictionary({'period': 100, 'interval': 1000}))
2875 p2p.ExtendedListen(dbus.Dictionary({}))
2876 dev[0].global_request("P2P_EXT_LISTEN")
2877
2878def test_dbus_p2p_service_discovery(dev, apdev):
2879 """D-Bus P2P service discovery"""
910eb5b5 2880 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2881 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2882
2883 addr0 = dev[0].p2p_dev_addr()
2884 addr1 = dev[1].p2p_dev_addr()
2885
2886 bonjour_query = dbus.ByteArray(binascii.unhexlify('0b5f6166706f766572746370c00c000c01'))
2887 bonjour_response = dbus.ByteArray(binascii.unhexlify('074578616d706c65c027'))
2888
2889 args = { 'service_type': 'bonjour',
2890 'query': bonjour_query,
2891 'response': bonjour_response }
2892 p2p.AddService(args)
2893 p2p.FlushService()
2894 p2p.AddService(args)
2895
2896 try:
2897 p2p.DeleteService(args)
2898 raise Exception("Invalid DeleteService() accepted")
2899 except dbus.exceptions.DBusException, e:
2900 if "InvalidArgs" not in str(e):
2901 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
2902
2903 args = { 'service_type': 'bonjour',
2904 'query': bonjour_query }
2905 p2p.DeleteService(args)
2906 try:
2907 p2p.DeleteService(args)
2908 raise Exception("Invalid DeleteService() accepted")
2909 except dbus.exceptions.DBusException, e:
2910 if "InvalidArgs" not in str(e):
2911 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
2912
2913 args = { 'service_type': 'upnp',
2914 'version': 0x10,
2915 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' }
2916 p2p.AddService(args)
2917 p2p.DeleteService(args)
2918 try:
2919 p2p.DeleteService(args)
2920 raise Exception("Invalid DeleteService() accepted")
2921 except dbus.exceptions.DBusException, e:
2922 if "InvalidArgs" not in str(e):
2923 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
2924
2925 tests = [ { 'service_type': 'foo' },
2926 { 'service_type': 'foo', 'query': bonjour_query },
2927 { 'service_type': 'upnp' },
2928 { 'service_type': 'upnp', 'version': 0x10 },
2929 { 'service_type': 'upnp',
2930 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
2931 { 'version': 0x10,
2932 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
2933 { 'service_type': 'upnp', 'foo': 'bar' },
2934 { 'service_type': 'bonjour' },
2935 { 'service_type': 'bonjour', 'query': 'foo' },
2936 { 'service_type': 'bonjour', 'foo': 'bar' } ]
2937 for args in tests:
2938 try:
2939 p2p.DeleteService(args)
2940 raise Exception("Invalid DeleteService() accepted")
2941 except dbus.exceptions.DBusException, e:
2942 if "InvalidArgs" not in str(e):
2943 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
2944
2945 tests = [ { 'service_type': 'foo' },
2946 { 'service_type': 'upnp' },
2947 { 'service_type': 'upnp', 'version': 0x10 },
2948 { 'service_type': 'upnp',
2949 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
2950 { 'version': 0x10,
2951 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
2952 { 'service_type': 'upnp', 'foo': 'bar' },
2953 { 'service_type': 'bonjour' },
2954 { 'service_type': 'bonjour', 'query': 'foo' },
2955 { 'service_type': 'bonjour', 'response': 'foo' },
2956 { 'service_type': 'bonjour', 'query': bonjour_query },
2957 { 'service_type': 'bonjour', 'response': bonjour_response },
2958 { 'service_type': 'bonjour', 'query': dbus.ByteArray(500*'a') },
2959 { 'service_type': 'bonjour', 'foo': 'bar' } ]
2960 for args in tests:
2961 try:
2962 p2p.AddService(args)
2963 raise Exception("Invalid AddService() accepted")
2964 except dbus.exceptions.DBusException, e:
2965 if "InvalidArgs" not in str(e):
2966 raise Exception("Unexpected error message for invalid AddService(): " + str(e))
2967
2968 args = { 'tlv': dbus.ByteArray("\x02\x00\x00\x01") }
2969 ref = p2p.ServiceDiscoveryRequest(args)
2970 p2p.ServiceDiscoveryCancelRequest(ref)
2971 try:
2972 p2p.ServiceDiscoveryCancelRequest(ref)
2973 raise Exception("Invalid ServiceDiscoveryCancelRequest() accepted")
2974 except dbus.exceptions.DBusException, e:
2975 if "InvalidArgs" not in str(e):
2976 raise Exception("Unexpected error message for invalid AddService(): " + str(e))
2977 try:
2978 p2p.ServiceDiscoveryCancelRequest(dbus.UInt64(0))
2979 raise Exception("Invalid ServiceDiscoveryCancelRequest() accepted")
2980 except dbus.exceptions.DBusException, e:
2981 if "InvalidArgs" not in str(e):
2982 raise Exception("Unexpected error message for invalid AddService(): " + str(e))
2983
2984 args = { 'service_type': 'upnp',
2985 'version': 0x10,
2986 'service': 'ssdp:foo' }
2987 ref = p2p.ServiceDiscoveryRequest(args)
2988 p2p.ServiceDiscoveryCancelRequest(ref)
2989
2990 tests = [ { 'service_type': 'foo' },
2991 { 'foo': 'bar' },
2992 { 'tlv': 'foo' },
2993 { },
2994 { 'version': 0 },
2995 { 'service_type': 'upnp',
2996 'service': 'ssdp:foo' },
2997 { 'service_type': 'upnp',
2998 'version': 0x10 },
2999 { 'service_type': 'upnp',
3000 'version': 0x10,
3001 'service': 'ssdp:foo',
3002 'peer_object': dbus.ObjectPath(path + "/Peers") },
3003 { 'service_type': 'upnp',
3004 'version': 0x10,
3005 'service': 'ssdp:foo',
3006 'peer_object': path + "/Peers" },
3007 { 'service_type': 'upnp',
3008 'version': 0x10,
3009 'service': 'ssdp:foo',
3010 'peer_object': dbus.ObjectPath(path + "/Peers/00112233445566") } ]
3011 for args in tests:
3012 try:
3013 p2p.ServiceDiscoveryRequest(args)
3014 raise Exception("Invalid ServiceDiscoveryRequest accepted")
3015 except dbus.exceptions.DBusException, e:
3016 if "InvalidArgs" not in str(e):
3017 raise Exception("Unexpected error message for invalid ServiceDiscoveryRequest(): " + str(e))
3018
3019 args = { 'foo': 'bar' }
3020 try:
3021 p2p.ServiceDiscoveryResponse(dbus.Dictionary(args, signature='sv'))
3022 raise Exception("Invalid ServiceDiscoveryResponse accepted")
3023 except dbus.exceptions.DBusException, e:
3024 if "InvalidArgs" not in str(e):
3025 raise Exception("Unexpected error message for invalid ServiceDiscoveryResponse(): " + str(e))
3026
3027def test_dbus_p2p_service_discovery_query(dev, apdev):
3028 """D-Bus P2P service discovery query"""
910eb5b5 3029 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
3030 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3031
3032 addr0 = dev[0].p2p_dev_addr()
3033 dev[1].request("P2P_SERVICE_ADD bonjour 0b5f6166706f766572746370c00c000c01 074578616d706c65c027")
3034 dev[1].p2p_listen()
3035 addr1 = dev[1].p2p_dev_addr()
3036
3037 class TestDbusP2p(TestDbus):
3038 def __init__(self, bus):
3039 TestDbus.__init__(self, bus)
3040 self.done = False
3041
3042 def __enter__(self):
3043 gobject.timeout_add(1, self.run_test)
3044 gobject.timeout_add(15000, self.timeout)
3045 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3046 "DeviceFound")
3047 self.add_signal(self.serviceDiscoveryResponse,
3048 WPAS_DBUS_IFACE_P2PDEVICE,
3049 "ServiceDiscoveryResponse", byte_arrays=True)
3050 self.loop.run()
3051 return self
3052
3053 def deviceFound(self, path):
3054 logger.debug("deviceFound: path=%s" % path)
3055 args = { 'peer_object': path,
3056 'tlv': dbus.ByteArray("\x02\x00\x00\x01") }
3057 p2p.ServiceDiscoveryRequest(args)
3058
3059 def serviceDiscoveryResponse(self, sd_request):
3060 logger.debug("serviceDiscoveryResponse: sd_request=%s" % str(sd_request))
3061 self.done = True
3062 self.loop.quit()
3063
3064 def run_test(self, *args):
3065 logger.debug("run_test")
3066 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social',
3067 'Timeout': dbus.Int32(10)}))
3068 return False
3069
3070 def success(self):
3071 return self.done
3072
3073 with TestDbusP2p(bus) as t:
3074 if not t.success():
3075 raise Exception("Expected signals not seen")
3076
3077 dev[1].p2p_stop_find()
3078
3079def test_dbus_p2p_service_discovery_external(dev, apdev):
3080 """D-Bus P2P service discovery with external response"""
3081 try:
3082 _test_dbus_p2p_service_discovery_external(dev, apdev)
3083 finally:
3084 dev[0].request("P2P_SERV_DISC_EXTERNAL 0")
3085
3086def _test_dbus_p2p_service_discovery_external(dev, apdev):
910eb5b5 3087 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
3088 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3089
3090 addr0 = dev[0].p2p_dev_addr()
3091 addr1 = dev[1].p2p_dev_addr()
3092 resp = "0300000101"
3093
3094 dev[1].request("P2P_FLUSH")
3095 dev[1].request("P2P_SERV_DISC_REQ " + addr0 + " 02000001")
3096 dev[1].p2p_find(social=True)
3097
3098 class TestDbusP2p(TestDbus):
3099 def __init__(self, bus):
3100 TestDbus.__init__(self, bus)
3101 self.sd = False
3102
3103 def __enter__(self):
3104 gobject.timeout_add(1, self.run_test)
3105 gobject.timeout_add(15000, self.timeout)
3106 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3107 "DeviceFound")
3108 self.add_signal(self.serviceDiscoveryRequest,
3109 WPAS_DBUS_IFACE_P2PDEVICE,
3110 "ServiceDiscoveryRequest")
3111 self.loop.run()
3112 return self
3113
3114 def deviceFound(self, path):
3115 logger.debug("deviceFound: path=%s" % path)
3116
3117 def serviceDiscoveryRequest(self, sd_request):
3118 logger.debug("serviceDiscoveryRequest: sd_request=%s" % str(sd_request))
3119 self.sd = True
3120 args = { 'peer_object': sd_request['peer_object'],
3121 'frequency': sd_request['frequency'],
3122 'dialog_token': sd_request['dialog_token'],
3123 'tlvs': dbus.ByteArray(binascii.unhexlify(resp)) }
3124 p2p.ServiceDiscoveryResponse(dbus.Dictionary(args, signature='sv'))
3125 self.loop.quit()
3126
3127 def run_test(self, *args):
3128 logger.debug("run_test")
3129 p2p.ServiceDiscoveryExternal(1)
3130 p2p.ServiceUpdate()
3131 p2p.Listen(15)
3132 return False
3133
3134 def success(self):
3135 return self.sd
3136
3137 with TestDbusP2p(bus) as t:
3138 if not t.success():
3139 raise Exception("Expected signals not seen")
3140
3141 ev = dev[1].wait_global_event(["P2P-SERV-DISC-RESP"], timeout=5)
3142 if ev is None:
3143 raise Exception("Service discovery timed out")
3144 if addr0 not in ev:
3145 raise Exception("Unexpected address in SD Response: " + ev)
3146 if ev.split(' ')[4] != resp:
3147 raise Exception("Unexpected response data SD Response: " + ev)
3148 dev[1].p2p_stop_find()
3149
3150 p2p.StopFind()
3151 p2p.ServiceDiscoveryExternal(0)
3152
3153def test_dbus_p2p_autogo(dev, apdev):
3154 """D-Bus P2P autonomous GO"""
910eb5b5 3155 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67 3156 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2ec82e67
JM
3157
3158 addr0 = dev[0].p2p_dev_addr()
3159
3160 class TestDbusP2p(TestDbus):
3161 def __init__(self, bus):
3162 TestDbus.__init__(self, bus)
3163 self.first = True
3164 self.waiting_end = False
035efb2c 3165 self.exceptions = False
001c4bf5 3166 self.deauthorized = False
2ec82e67
JM
3167 self.done = False
3168
3169 def __enter__(self):
3170 gobject.timeout_add(1, self.run_test)
3171 gobject.timeout_add(15000, self.timeout)
3172 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3173 "DeviceFound")
3174 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3175 "GroupStarted")
3176 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3177 "GroupFinished")
3178 self.add_signal(self.persistentGroupAdded,
3179 WPAS_DBUS_IFACE_P2PDEVICE,
3180 "PersistentGroupAdded")
3181 self.add_signal(self.persistentGroupRemoved,
3182 WPAS_DBUS_IFACE_P2PDEVICE,
3183 "PersistentGroupRemoved")
3184 self.add_signal(self.provisionDiscoveryRequestDisplayPin,
3185 WPAS_DBUS_IFACE_P2PDEVICE,
3186 "ProvisionDiscoveryRequestDisplayPin")
3187 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
3188 "StaAuthorized")
001c4bf5
JM
3189 self.add_signal(self.staDeauthorized, WPAS_DBUS_IFACE,
3190 "StaDeauthorized")
2ec82e67
JM
3191 self.loop.run()
3192 return self
3193
3194 def groupStarted(self, properties):
3195 logger.debug("groupStarted: " + str(properties))
3196 self.group = properties['group_object']
cb346b49
JM
3197 self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3198 properties['interface_object'])
3199 role = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Role",
3200 dbus_interface=dbus.PROPERTIES_IFACE)
2ec82e67 3201 if role != "GO":
035efb2c 3202 self.exceptions = True
2ec82e67 3203 raise Exception("Unexpected role reported: " + role)
cb346b49
JM
3204 group = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Group",
3205 dbus_interface=dbus.PROPERTIES_IFACE)
2ec82e67 3206 if group != properties['group_object']:
035efb2c 3207 self.exceptions = True
2ec82e67 3208 raise Exception("Unexpected Group reported: " + str(group))
cb346b49
JM
3209 go = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PeerGO",
3210 dbus_interface=dbus.PROPERTIES_IFACE)
2ec82e67 3211 if go != '/':
035efb2c 3212 self.exceptions = True
2ec82e67
JM
3213 raise Exception("Unexpected PeerGO value: " + str(go))
3214 if self.first:
3215 self.first = False
3216 logger.info("Remove persistent group instance")
cb346b49
JM
3217 group_p2p = dbus.Interface(self.g_if_obj,
3218 WPAS_DBUS_IFACE_P2PDEVICE)
3219 group_p2p.Disconnect()
2ec82e67
JM
3220 else:
3221 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3222 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 join")
3223
3224 def groupFinished(self, properties):
3225 logger.debug("groupFinished: " + str(properties))
3226 if self.waiting_end:
3227 logger.info("Remove persistent group")
3228 p2p.RemovePersistentGroup(self.persistent)
3229 else:
3230 logger.info("Re-start persistent group")
3231 params = dbus.Dictionary({'persistent_group_object':
3232 self.persistent,
3233 'frequency': 2412})
3234 p2p.GroupAdd(params)
3235
3236 def persistentGroupAdded(self, path, properties):
3237 logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
3238 self.persistent = path
3239
3240 def persistentGroupRemoved(self, path):
3241 logger.debug("persistentGroupRemoved: %s" % path)
3242 self.done = True
3243 self.loop.quit()
3244
3245 def deviceFound(self, path):
3246 logger.debug("deviceFound: path=%s" % path)
3247 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3248 self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3249 dbus_interface=dbus.PROPERTIES_IFACE,
3250 byte_arrays=True)
3251 logger.debug('peer properties: ' + str(self.peer))
3252
3253 def provisionDiscoveryRequestDisplayPin(self, peer_object, pin):
3254 logger.debug("provisionDiscoveryRequestDisplayPin - peer=%s pin=%s" % (peer_object, pin))
3255 self.peer_path = peer_object
3256 peer = binascii.unhexlify(peer_object.split('/')[-1])
3257 addr = ""
3258 for p in peer:
3259 if len(addr) > 0:
3260 addr += ':'
3261 addr += '%02x' % ord(p)
795b6f57
JM
3262
3263 params = { 'Role': 'registrar',
3264 'P2PDeviceAddress': self.peer['DeviceAddress'],
3265 'Bssid': self.peer['DeviceAddress'],
3266 'Type': 'pin' }
cb346b49 3267 wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS)
795b6f57
JM
3268 try:
3269 wps.Start(params)
035efb2c 3270 self.exceptions = True
795b6f57
JM
3271 raise Exception("Invalid WPS.Start() accepted")
3272 except dbus.exceptions.DBusException, e:
3273 if "InvalidArgs" not in str(e):
035efb2c 3274 self.exceptions = True
795b6f57 3275 raise Exception("Unexpected error message: " + str(e))
2ec82e67
JM
3276 params = { 'Role': 'registrar',
3277 'P2PDeviceAddress': self.peer['DeviceAddress'],
2ec82e67
JM
3278 'Type': 'pin',
3279 'Pin': '12345670' }
3280 logger.info("Authorize peer to connect to the group")
3281 wps.Start(params)
3282
3283 def staAuthorized(self, name):
3284 logger.debug("staAuthorized: " + name)
3285 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, self.peer_path)
3286 res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3287 dbus_interface=dbus.PROPERTIES_IFACE,
3288 byte_arrays=True)
035efb2c 3289 logger.debug("Peer properties: " + str(res))
2ec82e67 3290 if 'Groups' not in res or len(res['Groups']) != 1:
035efb2c 3291 self.exceptions = True
2ec82e67
JM
3292 raise Exception("Unexpected number of peer Groups entries")
3293 if res['Groups'][0] != self.group:
035efb2c 3294 self.exceptions = True
2ec82e67
JM
3295 raise Exception("Unexpected peer Groups[0] value")
3296
3297 g_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group)
3298 res = g_obj.GetAll(WPAS_DBUS_GROUP,
3299 dbus_interface=dbus.PROPERTIES_IFACE,
3300 byte_arrays=True)
3301 logger.debug("Group properties: " + str(res))
3302 if 'Members' not in res or len(res['Members']) != 1:
035efb2c 3303 self.exceptions = True
2ec82e67
JM
3304 raise Exception("Unexpected number of group members")
3305
3306 ext = dbus.ByteArray("\x11\x22\x33\x44")
3307 # Earlier implementation of this interface was a bit strange. The
3308 # property is defined to have aay signature and that is what the
3309 # getter returned. However, the setter expected there to be a
3310 # dictionary with 'WPSVendorExtensions' as the key surrounding these
3311 # values.. The current implementations maintains support for that
3312 # for backwards compability reasons. Verify that encoding first.
3313 vals = dbus.Dictionary({ 'WPSVendorExtensions': [ ext ]},
3314 signature='sv')
3315 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3316 dbus_interface=dbus.PROPERTIES_IFACE)
3317 res = g_obj.Get(WPAS_DBUS_GROUP, 'WPSVendorExtensions',
3318 dbus_interface=dbus.PROPERTIES_IFACE,
3319 byte_arrays=True)
3320 if len(res) != 1:
035efb2c 3321 self.exceptions = True
2ec82e67
JM
3322 raise Exception("Unexpected number of vendor extensions")
3323 if res[0] != ext:
035efb2c 3324 self.exceptions = True
2ec82e67
JM
3325 raise Exception("Vendor extension value changed")
3326
3327 # And now verify that the more appropriate encoding is accepted as
3328 # well.
3329 res.append(dbus.ByteArray('\xaa\xbb\xcc\xdd\xee\xff'))
3330 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
3331 dbus_interface=dbus.PROPERTIES_IFACE)
3332 res2 = g_obj.Get(WPAS_DBUS_GROUP, 'WPSVendorExtensions',
3333 dbus_interface=dbus.PROPERTIES_IFACE,
3334 byte_arrays=True)
3335 if len(res) != 2:
035efb2c 3336 self.exceptions = True
2ec82e67
JM
3337 raise Exception("Unexpected number of vendor extensions")
3338 if res[0] != res2[0] or res[1] != res2[1]:
035efb2c 3339 self.exceptions = True
2ec82e67
JM
3340 raise Exception("Vendor extension value changed")
3341
3342 for i in range(10):
3343 res.append(dbus.ByteArray('\xaa\xbb'))
3344 try:
3345 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
3346 dbus_interface=dbus.PROPERTIES_IFACE)
035efb2c 3347 self.exceptions = True
2ec82e67
JM
3348 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3349 except dbus.exceptions.DBusException, e:
3350 if "Error.Failed" not in str(e):
035efb2c 3351 self.exceptions = True
2ec82e67
JM
3352 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3353
3354 vals = dbus.Dictionary({ 'Foo': [ ext ]}, signature='sv')
3355 try:
3356 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3357 dbus_interface=dbus.PROPERTIES_IFACE)
035efb2c 3358 self.exceptions = True
2ec82e67
JM
3359 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3360 except dbus.exceptions.DBusException, e:
3361 if "InvalidArgs" not in str(e):
035efb2c 3362 self.exceptions = True
2ec82e67
JM
3363 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3364
3365 vals = [ "foo" ]
3366 try:
3367 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3368 dbus_interface=dbus.PROPERTIES_IFACE)
035efb2c 3369 self.exceptions = True
2ec82e67
JM
3370 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3371 except dbus.exceptions.DBusException, e:
3372 if "Error.Failed" not in str(e):
035efb2c 3373 self.exceptions = True
2ec82e67
JM
3374 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3375
3376 vals = [ [ "foo" ] ]
3377 try:
3378 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3379 dbus_interface=dbus.PROPERTIES_IFACE)
035efb2c 3380 self.exceptions = True
2ec82e67
JM
3381 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3382 except dbus.exceptions.DBusException, e:
3383 if "Error.Failed" not in str(e):
035efb2c 3384 self.exceptions = True
2ec82e67
JM
3385 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3386
001c4bf5
JM
3387 p2p.RemoveClient({ 'peer': self.peer_path })
3388
2ec82e67 3389 self.waiting_end = True
cb346b49
JM
3390 group_p2p = dbus.Interface(self.g_if_obj,
3391 WPAS_DBUS_IFACE_P2PDEVICE)
3392 group_p2p.Disconnect()
2ec82e67 3393
001c4bf5
JM
3394 def staDeauthorized(self, name):
3395 logger.debug("staDeauthorized: " + name)
3396 self.deauthorized = True
3397
2ec82e67
JM
3398 def run_test(self, *args):
3399 logger.debug("run_test")
3400 params = dbus.Dictionary({'persistent': True,
3401 'frequency': 2412})
3402 logger.info("Add a persistent group")
3403 p2p.GroupAdd(params)
3404 return False
3405
3406 def success(self):
035efb2c 3407 return self.done and self.deauthorized and not self.exceptions
2ec82e67
JM
3408
3409 with TestDbusP2p(bus) as t:
3410 if not t.success():
3411 raise Exception("Expected signals not seen")
3412
3413 dev[1].wait_go_ending_session()
3414
3415def test_dbus_p2p_autogo_pbc(dev, apdev):
3416 """D-Bus P2P autonomous GO and PBC"""
910eb5b5 3417 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67 3418 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2ec82e67
JM
3419
3420 addr0 = dev[0].p2p_dev_addr()
3421
3422 class TestDbusP2p(TestDbus):
3423 def __init__(self, bus):
3424 TestDbus.__init__(self, bus)
3425 self.first = True
3426 self.waiting_end = False
3427 self.done = False
3428
3429 def __enter__(self):
3430 gobject.timeout_add(1, self.run_test)
3431 gobject.timeout_add(15000, self.timeout)
3432 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3433 "DeviceFound")
3434 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3435 "GroupStarted")
3436 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3437 "GroupFinished")
3438 self.add_signal(self.provisionDiscoveryPBCRequest,
3439 WPAS_DBUS_IFACE_P2PDEVICE,
3440 "ProvisionDiscoveryPBCRequest")
3441 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
3442 "StaAuthorized")
3443 self.loop.run()
3444 return self
3445
3446 def groupStarted(self, properties):
3447 logger.debug("groupStarted: " + str(properties))
3448 self.group = properties['group_object']
cb346b49
JM
3449 self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3450 properties['interface_object'])
2ec82e67
JM
3451 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3452 dev1.global_request("P2P_CONNECT " + addr0 + " pbc join")
3453
3454 def groupFinished(self, properties):
3455 logger.debug("groupFinished: " + str(properties))
3456 self.done = True
3457 self.loop.quit()
3458
3459 def deviceFound(self, path):
3460 logger.debug("deviceFound: path=%s" % path)
3461 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3462 self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3463 dbus_interface=dbus.PROPERTIES_IFACE,
3464 byte_arrays=True)
3465 logger.debug('peer properties: ' + str(self.peer))
3466
3467 def provisionDiscoveryPBCRequest(self, peer_object):
3468 logger.debug("provisionDiscoveryPBCRequest - peer=%s" % peer_object)
3469 self.peer_path = peer_object
3470 peer = binascii.unhexlify(peer_object.split('/')[-1])
3471 addr = ""
3472 for p in peer:
3473 if len(addr) > 0:
3474 addr += ':'
3475 addr += '%02x' % ord(p)
3476 params = { 'Role': 'registrar',
3477 'P2PDeviceAddress': self.peer['DeviceAddress'],
2ec82e67
JM
3478 'Type': 'pbc' }
3479 logger.info("Authorize peer to connect to the group")
cb346b49 3480 wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS)
2ec82e67
JM
3481 wps.Start(params)
3482
3483 def staAuthorized(self, name):
3484 logger.debug("staAuthorized: " + name)
cb346b49
JM
3485 group_p2p = dbus.Interface(self.g_if_obj,
3486 WPAS_DBUS_IFACE_P2PDEVICE)
3487 group_p2p.Disconnect()
2ec82e67
JM
3488
3489 def run_test(self, *args):
3490 logger.debug("run_test")
3491 params = dbus.Dictionary({'frequency': 2412})
3492 p2p.GroupAdd(params)
3493 return False
3494
3495 def success(self):
3496 return self.done
3497
3498 with TestDbusP2p(bus) as t:
3499 if not t.success():
3500 raise Exception("Expected signals not seen")
3501
3502 dev[1].wait_go_ending_session()
3503 dev[1].flush_scan_cache()
3504
3505def test_dbus_p2p_autogo_legacy(dev, apdev):
3506 """D-Bus P2P autonomous GO and legacy STA"""
910eb5b5 3507 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67 3508 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2ec82e67
JM
3509
3510 addr0 = dev[0].p2p_dev_addr()
3511
3512 class TestDbusP2p(TestDbus):
3513 def __init__(self, bus):
3514 TestDbus.__init__(self, bus)
3515 self.done = False
3516
3517 def __enter__(self):
3518 gobject.timeout_add(1, self.run_test)
3519 gobject.timeout_add(15000, self.timeout)
3520 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3521 "GroupStarted")
3522 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3523 "GroupFinished")
3524 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
3525 "StaAuthorized")
3526 self.loop.run()
3527 return self
3528
3529 def groupStarted(self, properties):
3530 logger.debug("groupStarted: " + str(properties))
cb346b49
JM
3531 g_obj = bus.get_object(WPAS_DBUS_SERVICE,
3532 properties['group_object'])
3533 res = g_obj.GetAll(WPAS_DBUS_GROUP,
3534 dbus_interface=dbus.PROPERTIES_IFACE,
3535 byte_arrays=True)
3536 bssid = ':'.join([binascii.hexlify(l) for l in res['BSSID']])
3537
2ec82e67
JM
3538 pin = '12345670'
3539 params = { 'Role': 'enrollee',
3540 'Type': 'pin',
3541 'Pin': pin }
cb346b49
JM
3542 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3543 properties['interface_object'])
3544 wps = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_WPS)
2ec82e67
JM
3545 wps.Start(params)
3546 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
cb346b49
JM
3547 dev1.scan_for_bss(bssid, freq=2412)
3548 dev1.request("WPS_PIN " + bssid + " " + pin)
3549 self.group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2ec82e67
JM
3550
3551 def groupFinished(self, properties):
3552 logger.debug("groupFinished: " + str(properties))
3553 self.done = True
3554 self.loop.quit()
3555
3556 def staAuthorized(self, name):
3557 logger.debug("staAuthorized: " + name)
3558 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3559 dev1.request("DISCONNECT")
cb346b49 3560 self.group_p2p.Disconnect()
2ec82e67
JM
3561
3562 def run_test(self, *args):
3563 logger.debug("run_test")
3564 params = dbus.Dictionary({'frequency': 2412})
3565 p2p.GroupAdd(params)
3566 return False
3567
3568 def success(self):
3569 return self.done
3570
3571 with TestDbusP2p(bus) as t:
3572 if not t.success():
3573 raise Exception("Expected signals not seen")
3574
3575def test_dbus_p2p_join(dev, apdev):
3576 """D-Bus P2P join an autonomous GO"""
910eb5b5 3577 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
3578 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3579
3580 addr1 = dev[1].p2p_dev_addr()
3581 addr2 = dev[2].p2p_dev_addr()
3582 dev[1].p2p_start_go(freq=2412)
cb346b49 3583 dev1_group_ifname = dev[1].group_ifname
2ec82e67
JM
3584 dev[2].p2p_listen()
3585
3586 class TestDbusP2p(TestDbus):
3587 def __init__(self, bus):
3588 TestDbus.__init__(self, bus)
3589 self.done = False
3590 self.peer = None
3591 self.go = None
3592
3593 def __enter__(self):
3594 gobject.timeout_add(1, self.run_test)
3595 gobject.timeout_add(15000, self.timeout)
3596 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3597 "DeviceFound")
3598 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3599 "GroupStarted")
3600 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3601 "GroupFinished")
3602 self.add_signal(self.invitationResult, WPAS_DBUS_IFACE_P2PDEVICE,
3603 "InvitationResult")
3604 self.loop.run()
3605 return self
3606
3607 def deviceFound(self, path):
3608 logger.debug("deviceFound: path=%s" % path)
3609 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3610 res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3611 dbus_interface=dbus.PROPERTIES_IFACE,
3612 byte_arrays=True)
3613 logger.debug('peer properties: ' + str(res))
3614 if addr2.replace(':','') in path:
3615 self.peer = path
3616 elif addr1.replace(':','') in path:
3617 self.go = path
3618 if self.peer and self.go:
3619 logger.info("Join the group")
3620 p2p.StopFind()
3621 args = { 'peer': self.go,
3622 'join': True,
3623 'wps_method': 'pin',
3624 'frequency': 2412 }
3625 pin = p2p.Connect(args)
3626
3627 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
cb346b49
JM
3628 dev1.group_ifname = dev1_group_ifname
3629 dev1.group_request("WPS_PIN any " + pin)
2ec82e67
JM
3630
3631 def groupStarted(self, properties):
3632 logger.debug("groupStarted: " + str(properties))
cb346b49
JM
3633 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3634 properties['interface_object'])
3635 role = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Role",
3636 dbus_interface=dbus.PROPERTIES_IFACE)
2ec82e67
JM
3637 if role != "client":
3638 raise Exception("Unexpected role reported: " + role)
cb346b49
JM
3639 group = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Group",
3640 dbus_interface=dbus.PROPERTIES_IFACE)
2ec82e67
JM
3641 if group != properties['group_object']:
3642 raise Exception("Unexpected Group reported: " + str(group))
cb346b49
JM
3643 go = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PeerGO",
3644 dbus_interface=dbus.PROPERTIES_IFACE)
2ec82e67
JM
3645 if go != self.go:
3646 raise Exception("Unexpected PeerGO value: " + str(go))
3647
3648 g_obj = bus.get_object(WPAS_DBUS_SERVICE,
3649 properties['group_object'])
3650 res = g_obj.GetAll(WPAS_DBUS_GROUP,
3651 dbus_interface=dbus.PROPERTIES_IFACE,
3652 byte_arrays=True)
3653 logger.debug("Group properties: " + str(res))
3654
3655 ext = dbus.ByteArray("\x11\x22\x33\x44")
3656 try:
3657 # Set(WPSVendorExtensions) not allowed for P2P Client
3658 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
3659 dbus_interface=dbus.PROPERTIES_IFACE)
3660 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3661 except dbus.exceptions.DBusException, e:
3662 if "Error.Failed: Failed to set property" not in str(e):
3663 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3664
cb346b49 3665 group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2ec82e67
JM
3666 args = { 'duration1': 30000, 'interval1': 102400,
3667 'duration2': 20000, 'interval2': 102400 }
cb346b49 3668 group_p2p.PresenceRequest(args)
2ec82e67
JM
3669
3670 args = { 'peer': self.peer }
cb346b49 3671 group_p2p.Invite(args)
2ec82e67
JM
3672
3673 def groupFinished(self, properties):
3674 logger.debug("groupFinished: " + str(properties))
3675 self.done = True
3676 self.loop.quit()
3677
3678 def invitationResult(self, result):
3679 logger.debug("invitationResult: " + str(result))
3680 if result['status'] != 1:
3681 raise Exception("Unexpected invitation result: " + str(result))
3682 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
cb346b49 3683 dev1.group_ifname = dev1_group_ifname
2ec82e67
JM
3684 dev1.remove_group()
3685
3686 def run_test(self, *args):
3687 logger.debug("run_test")
3688 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
3689 return False
3690
3691 def success(self):
3692 return self.done
3693
3694 with TestDbusP2p(bus) as t:
3695 if not t.success():
3696 raise Exception("Expected signals not seen")
3697
3698 dev[2].p2p_stop_find()
3699
1992af11
JM
3700def test_dbus_p2p_invitation_received(dev, apdev):
3701 """D-Bus P2P and InvitationReceived"""
3702 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3703 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3704
3705 form(dev[0], dev[1])
3706 addr0 = dev[0].p2p_dev_addr()
3707 dev[0].p2p_listen()
3708 dev[0].global_request("SET persistent_reconnect 0")
3709
3710 if not dev[1].discover_peer(addr0, social=True):
3711 raise Exception("Peer " + addr0 + " not found")
3712 peer = dev[1].get_peer(addr0)
3713
3714 class TestDbusP2p(TestDbus):
3715 def __init__(self, bus):
3716 TestDbus.__init__(self, bus)
3717 self.done = False
3718
3719 def __enter__(self):
3720 gobject.timeout_add(1, self.run_test)
3721 gobject.timeout_add(15000, self.timeout)
3722 self.add_signal(self.invitationReceived, WPAS_DBUS_IFACE_P2PDEVICE,
3723 "InvitationReceived")
3724 self.loop.run()
3725 return self
3726
3727 def invitationReceived(self, result):
3728 logger.debug("invitationReceived: " + str(result))
3729 self.done = True
3730 self.loop.quit()
3731
3732 def run_test(self, *args):
3733 logger.debug("run_test")
3734 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3735 cmd = "P2P_INVITE persistent=" + peer['persistent'] + " peer=" + addr0
3736 dev1.global_request(cmd)
3737 return False
3738
3739 def success(self):
3740 return self.done
3741
3742 with TestDbusP2p(bus) as t:
3743 if not t.success():
3744 raise Exception("Expected signals not seen")
3745
3746 dev[0].p2p_stop_find()
3747 dev[1].p2p_stop_find()
3748
2ec82e67
JM
3749def test_dbus_p2p_config(dev, apdev):
3750 """D-Bus Get/Set P2PDeviceConfig"""
3751 try:
3752 _test_dbus_p2p_config(dev, apdev)
3753 finally:
3754 dev[0].request("P2P_SET ssid_postfix ")
3755
3756def _test_dbus_p2p_config(dev, apdev):
910eb5b5 3757 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
3758 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3759
3760 res = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3761 dbus_interface=dbus.PROPERTIES_IFACE,
3762 byte_arrays=True)
3763 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig", res,
3764 dbus_interface=dbus.PROPERTIES_IFACE)
3765 res2 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3766 dbus_interface=dbus.PROPERTIES_IFACE,
3767 byte_arrays=True)
3768
3769 if len(res) != len(res2):
3770 raise Exception("Different number of parameters")
3771 for k in res:
3772 if res[k] != res2[k]:
3773 raise Exception("Parameter %s value changes" % k)
3774
3775 changes = { 'SsidPostfix': 'foo',
3776 'VendorExtension': [ dbus.ByteArray('\x11\x22\x33\x44') ],
3777 'SecondaryDeviceTypes': [ dbus.ByteArray('\x11\x22\x33\x44\x55\x66\x77\x88') ]}
3778 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3779 dbus.Dictionary(changes, signature='sv'),
3780 dbus_interface=dbus.PROPERTIES_IFACE)
3781
3782 res2 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3783 dbus_interface=dbus.PROPERTIES_IFACE,
3784 byte_arrays=True)
3785 logger.debug("P2PDeviceConfig: " + str(res2))
3786 if 'VendorExtension' not in res2 or len(res2['VendorExtension']) != 1:
3787 raise Exception("VendorExtension does not match")
3788 if 'SecondaryDeviceTypes' not in res2 or len(res2['SecondaryDeviceTypes']) != 1:
3789 raise Exception("SecondaryDeviceType does not match")
3790
3791 changes = { 'SsidPostfix': '',
3792 'VendorExtension': dbus.Array([], signature="ay"),
3793 'SecondaryDeviceTypes': dbus.Array([], signature="ay") }
3794 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3795 dbus.Dictionary(changes, signature='sv'),
3796 dbus_interface=dbus.PROPERTIES_IFACE)
3797
3798 res3 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3799 dbus_interface=dbus.PROPERTIES_IFACE,
3800 byte_arrays=True)
3801 logger.debug("P2PDeviceConfig: " + str(res3))
3802 if 'VendorExtension' in res3:
3803 raise Exception("VendorExtension not removed")
3804 if 'SecondaryDeviceTypes' in res3:
3805 raise Exception("SecondaryDeviceType not removed")
3806
3807 try:
3808 dev[0].request("P2P_SET disabled 1")
3809 if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3810 dbus_interface=dbus.PROPERTIES_IFACE,
3811 byte_arrays=True)
3812 raise Exception("Invalid Get(P2PDeviceConfig) accepted")
3813 except dbus.exceptions.DBusException, e:
3814 if "Error.Failed: P2P is not available for this interface" not in str(e):
3815 raise Exception("Unexpected error message for invalid Invite: " + str(e))
3816 finally:
3817 dev[0].request("P2P_SET disabled 0")
3818
3819 try:
3820 dev[0].request("P2P_SET disabled 1")
3821 changes = { 'SsidPostfix': 'foo' }
3822 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3823 dbus.Dictionary(changes, signature='sv'),
3824 dbus_interface=dbus.PROPERTIES_IFACE)
3825 raise Exception("Invalid Set(P2PDeviceConfig) accepted")
3826 except dbus.exceptions.DBusException, e:
3827 if "Error.Failed: P2P is not available for this interface" not in str(e):
3828 raise Exception("Unexpected error message for invalid Invite: " + str(e))
3829 finally:
3830 dev[0].request("P2P_SET disabled 0")
3831
3832 tests = [ { 'DeviceName': 123 },
3833 { 'SsidPostfix': 123 },
3834 { 'Foo': 'Bar' } ]
3835 for changes in tests:
3836 try:
3837 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3838 dbus.Dictionary(changes, signature='sv'),
3839 dbus_interface=dbus.PROPERTIES_IFACE)
3840 raise Exception("Invalid Set(P2PDeviceConfig) accepted")
3841 except dbus.exceptions.DBusException, e:
3842 if "InvalidArgs" not in str(e):
3843 raise Exception("Unexpected error message for invalid Invite: " + str(e))
3844
3845def test_dbus_p2p_persistent(dev, apdev):
3846 """D-Bus P2P persistent group"""
910eb5b5 3847 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
3848 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3849
3850 class TestDbusP2p(TestDbus):
3851 def __init__(self, bus):
3852 TestDbus.__init__(self, bus)
3853
3854 def __enter__(self):
3855 gobject.timeout_add(1, self.run_test)
3856 gobject.timeout_add(15000, self.timeout)
3857 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3858 "GroupStarted")
3859 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3860 "GroupFinished")
3861 self.add_signal(self.persistentGroupAdded,
3862 WPAS_DBUS_IFACE_P2PDEVICE,
3863 "PersistentGroupAdded")
3864 self.loop.run()
3865 return self
3866
3867 def groupStarted(self, properties):
3868 logger.debug("groupStarted: " + str(properties))
cb346b49
JM
3869 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3870 properties['interface_object'])
3871 group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3872 group_p2p.Disconnect()
2ec82e67
JM
3873
3874 def groupFinished(self, properties):
3875 logger.debug("groupFinished: " + str(properties))
3876 self.loop.quit()
3877
3878 def persistentGroupAdded(self, path, properties):
3879 logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
3880 self.persistent = path
3881
3882 def run_test(self, *args):
3883 logger.debug("run_test")
3884 params = dbus.Dictionary({'persistent': True,
3885 'frequency': 2412})
3886 logger.info("Add a persistent group")
3887 p2p.GroupAdd(params)
3888 return False
3889
3890 def success(self):
3891 return True
3892
3893 with TestDbusP2p(bus) as t:
3894 if not t.success():
3895 raise Exception("Expected signals not seen")
3896 persistent = t.persistent
3897
3898 p_obj = bus.get_object(WPAS_DBUS_SERVICE, persistent)
3899 res = p_obj.Get(WPAS_DBUS_PERSISTENT_GROUP, "Properties",
3900 dbus_interface=dbus.PROPERTIES_IFACE, byte_arrays=True)
3901 logger.info("Persistent group Properties: " + str(res))
3902 vals = dbus.Dictionary({ 'ssid': 'DIRECT-foo' }, signature='sv')
3903 p_obj.Set(WPAS_DBUS_PERSISTENT_GROUP, "Properties", vals,
3904 dbus_interface=dbus.PROPERTIES_IFACE)
3905 res2 = p_obj.Get(WPAS_DBUS_PERSISTENT_GROUP, "Properties",
3906 dbus_interface=dbus.PROPERTIES_IFACE)
3907 if len(res) != len(res2):
3908 raise Exception("Different number of parameters")
3909 for k in res:
3910 if k != 'ssid' and res[k] != res2[k]:
3911 raise Exception("Parameter %s value changes" % k)
3912 if res2['ssid'] != '"DIRECT-foo"':
3913 raise Exception("Unexpected ssid")
3914
3915 args = dbus.Dictionary({ 'ssid': 'DIRECT-testing',
3916 'psk': '1234567890' }, signature='sv')
3917 group = p2p.AddPersistentGroup(args)
3918
3919 groups = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PersistentGroups",
3920 dbus_interface=dbus.PROPERTIES_IFACE)
3921 if len(groups) != 2:
3922 raise Exception("Unexpected number of persistent groups: " + str(groups))
3923
3924 p2p.RemoveAllPersistentGroups()
3925
3926 groups = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PersistentGroups",
3927 dbus_interface=dbus.PROPERTIES_IFACE)
3928 if len(groups) != 0:
3929 raise Exception("Unexpected number of persistent groups: " + str(groups))
3930
3931 try:
3932 p2p.RemovePersistentGroup(persistent)
3933 raise Exception("Invalid RemovePersistentGroup accepted")
3934 except dbus.exceptions.DBusException, e:
3935 if "NetworkUnknown: There is no such persistent group" not in str(e):
3936 raise Exception("Unexpected error message for invalid RemovePersistentGroup: " + str(e))
3937
3938def test_dbus_p2p_reinvoke_persistent(dev, apdev):
3939 """D-Bus P2P reinvoke persistent group"""
910eb5b5 3940 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67 3941 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2ec82e67
JM
3942
3943 addr0 = dev[0].p2p_dev_addr()
3944
3945 class TestDbusP2p(TestDbus):
3946 def __init__(self, bus):
3947 TestDbus.__init__(self, bus)
3948 self.first = True
3949 self.waiting_end = False
3950 self.done = False
3951 self.invited = False
3952
3953 def __enter__(self):
3954 gobject.timeout_add(1, self.run_test)
3955 gobject.timeout_add(15000, self.timeout)
3956 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3957 "DeviceFound")
3958 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3959 "GroupStarted")
3960 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3961 "GroupFinished")
3962 self.add_signal(self.persistentGroupAdded,
3963 WPAS_DBUS_IFACE_P2PDEVICE,
3964 "PersistentGroupAdded")
3965 self.add_signal(self.provisionDiscoveryRequestDisplayPin,
3966 WPAS_DBUS_IFACE_P2PDEVICE,
3967 "ProvisionDiscoveryRequestDisplayPin")
3968 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
3969 "StaAuthorized")
3970 self.loop.run()
3971 return self
3972
3973 def groupStarted(self, properties):
3974 logger.debug("groupStarted: " + str(properties))
cb346b49
JM
3975 self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3976 properties['interface_object'])
2ec82e67 3977 if not self.invited:
cb346b49
JM
3978 g_obj = bus.get_object(WPAS_DBUS_SERVICE,
3979 properties['group_object'])
3980 res = g_obj.GetAll(WPAS_DBUS_GROUP,
3981 dbus_interface=dbus.PROPERTIES_IFACE,
3982 byte_arrays=True)
3983 bssid = ':'.join([binascii.hexlify(l) for l in res['BSSID']])
2ec82e67 3984 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
cb346b49 3985 dev1.scan_for_bss(bssid, freq=2412)
2ec82e67
JM
3986 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 join")
3987
3988 def groupFinished(self, properties):
3989 logger.debug("groupFinished: " + str(properties))
3990 if self.invited:
3991 self.done = True
3992 self.loop.quit()
3993 else:
3994 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
cb346b49 3995 dev1.global_request("SET persistent_reconnect 1")
2ec82e67
JM
3996 dev1.p2p_listen()
3997
3998 args = { 'persistent_group_object': dbus.ObjectPath(path),
3999 'peer': self.peer_path }
4000 try:
4001 pin = p2p.Invite(args)
4002 raise Exception("Invalid Invite accepted")
4003 except dbus.exceptions.DBusException, e:
4004 if "InvalidArgs" not in str(e):
4005 raise Exception("Unexpected error message for invalid Invite: " + str(e))
4006
4007 args = { 'persistent_group_object': self.persistent,
4008 'peer': self.peer_path }
4009 pin = p2p.Invite(args)
4010 self.invited = True
4011
cb346b49
JM
4012 self.sta_group_ev = dev1.wait_global_event(["P2P-GROUP-STARTED"],
4013 timeout=15)
4014 if self.sta_group_ev is None:
4015 raise Exception("P2P-GROUP-STARTED event not seen")
4016
2ec82e67
JM
4017 def persistentGroupAdded(self, path, properties):
4018 logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
4019 self.persistent = path
4020
4021 def deviceFound(self, path):
4022 logger.debug("deviceFound: path=%s" % path)
4023 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
4024 self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
4025 dbus_interface=dbus.PROPERTIES_IFACE,
4026 byte_arrays=True)
4027
4028 def provisionDiscoveryRequestDisplayPin(self, peer_object, pin):
4029 logger.debug("provisionDiscoveryRequestDisplayPin - peer=%s pin=%s" % (peer_object, pin))
4030 self.peer_path = peer_object
4031 peer = binascii.unhexlify(peer_object.split('/')[-1])
4032 addr = ""
4033 for p in peer:
4034 if len(addr) > 0:
4035 addr += ':'
4036 addr += '%02x' % ord(p)
4037 params = { 'Role': 'registrar',
4038 'P2PDeviceAddress': self.peer['DeviceAddress'],
4039 'Bssid': self.peer['DeviceAddress'],
4040 'Type': 'pin',
4041 'Pin': '12345670' }
4042 logger.info("Authorize peer to connect to the group")
cb346b49
JM
4043 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4044 wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS)
2ec82e67 4045 wps.Start(params)
cb346b49
JM
4046 self.sta_group_ev = dev1.wait_global_event(["P2P-GROUP-STARTED"],
4047 timeout=15)
4048 if self.sta_group_ev is None:
4049 raise Exception("P2P-GROUP-STARTED event not seen")
2ec82e67
JM
4050
4051 def staAuthorized(self, name):
4052 logger.debug("staAuthorized: " + name)
4053 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
cb346b49 4054 dev1.group_form_result(self.sta_group_ev)
2ec82e67 4055 dev1.remove_group()
cb346b49 4056 ev = dev1.wait_global_event(["P2P-GROUP-REMOVED"], timeout=10)
2ec82e67
JM
4057 if ev is None:
4058 raise Exception("Group removal timed out")
cb346b49
JM
4059 group_p2p = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4060 group_p2p.Disconnect()
2ec82e67
JM
4061
4062 def run_test(self, *args):
4063 logger.debug("run_test")
4064 params = dbus.Dictionary({'persistent': True,
4065 'frequency': 2412})
4066 logger.info("Add a persistent group")
4067 p2p.GroupAdd(params)
4068 return False
4069
4070 def success(self):
4071 return self.done
4072
4073 with TestDbusP2p(bus) as t:
4074 if not t.success():
4075 raise Exception("Expected signals not seen")
4076
4077def test_dbus_p2p_go_neg_rx(dev, apdev):
4078 """D-Bus P2P GO Negotiation receive"""
910eb5b5 4079 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
4080 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4081 addr0 = dev[0].p2p_dev_addr()
4082
4083 class TestDbusP2p(TestDbus):
4084 def __init__(self, bus):
4085 TestDbus.__init__(self, bus)
4086 self.done = False
4087
4088 def __enter__(self):
4089 gobject.timeout_add(1, self.run_test)
4090 gobject.timeout_add(15000, self.timeout)
4091 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4092 "DeviceFound")
4093 self.add_signal(self.goNegotiationRequest,
4094 WPAS_DBUS_IFACE_P2PDEVICE,
4095 "GONegotiationRequest",
4096 byte_arrays=True)
4097 self.add_signal(self.goNegotiationSuccess,
4098 WPAS_DBUS_IFACE_P2PDEVICE,
4099 "GONegotiationSuccess",
4100 byte_arrays=True)
4101 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4102 "GroupStarted")
4103 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4104 "GroupFinished")
4105 self.loop.run()
4106 return self
4107
4108 def deviceFound(self, path):
4109 logger.debug("deviceFound: path=%s" % path)
4110
f5d5161d
JM
4111 def goNegotiationRequest(self, path, dev_passwd_id, go_intent=0):
4112 logger.debug("goNegotiationRequest: path=%s dev_passwd_id=%d go_intent=%d" % (path, dev_passwd_id, go_intent))
2ec82e67
JM
4113 if dev_passwd_id != 1:
4114 raise Exception("Unexpected dev_passwd_id=%d" % dev_passwd_id)
4115 args = { 'peer': path, 'wps_method': 'display', 'pin': '12345670',
4116 'go_intent': 15, 'persistent': False, 'frequency': 5175 }
4117 try:
4118 p2p.Connect(args)
4119 raise Exception("Invalid Connect accepted")
4120 except dbus.exceptions.DBusException, e:
4121 if "ConnectChannelUnsupported" not in str(e):
4122 raise Exception("Unexpected error message for invalid Connect: " + str(e))
4123
4124 args = { 'peer': path, 'wps_method': 'display', 'pin': '12345670',
4125 'go_intent': 15, 'persistent': False }
4126 p2p.Connect(args)
4127
4128 def goNegotiationSuccess(self, properties):
4129 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4130
4131 def groupStarted(self, properties):
4132 logger.debug("groupStarted: " + str(properties))
cb346b49
JM
4133 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4134 properties['interface_object'])
4135 group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4136 group_p2p.Disconnect()
2ec82e67
JM
4137
4138 def groupFinished(self, properties):
4139 logger.debug("groupFinished: " + str(properties))
4140 self.done = True
4141 self.loop.quit()
4142
4143 def run_test(self, *args):
4144 logger.debug("run_test")
4145 p2p.Listen(10)
4146 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4147 if not dev1.discover_peer(addr0):
4148 raise Exception("Peer not found")
4149 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 enter")
4150 return False
4151
4152 def success(self):
4153 return self.done
4154
4155 with TestDbusP2p(bus) as t:
4156 if not t.success():
4157 raise Exception("Expected signals not seen")
4158
4159def test_dbus_p2p_go_neg_auth(dev, apdev):
4160 """D-Bus P2P GO Negotiation authorized"""
910eb5b5 4161 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
4162 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4163 addr0 = dev[0].p2p_dev_addr()
4164 dev[1].p2p_listen()
4165
4166 class TestDbusP2p(TestDbus):
4167 def __init__(self, bus):
4168 TestDbus.__init__(self, bus)
4169 self.done = False
4170 self.peer_joined = False
4171 self.peer_disconnected = False
4172
4173 def __enter__(self):
4174 gobject.timeout_add(1, self.run_test)
4175 gobject.timeout_add(15000, self.timeout)
4176 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4177 "DeviceFound")
4178 self.add_signal(self.goNegotiationSuccess,
4179 WPAS_DBUS_IFACE_P2PDEVICE,
4180 "GONegotiationSuccess",
4181 byte_arrays=True)
4182 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4183 "GroupStarted")
4184 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4185 "GroupFinished")
4186 self.add_signal(self.staDeauthorized, WPAS_DBUS_IFACE,
4187 "StaDeauthorized")
4188 self.add_signal(self.peerJoined, WPAS_DBUS_GROUP,
4189 "PeerJoined")
4190 self.add_signal(self.peerDisconnected, WPAS_DBUS_GROUP,
4191 "PeerDisconnected")
4192 self.loop.run()
4193 return self
4194
4195 def deviceFound(self, path):
4196 logger.debug("deviceFound: path=%s" % path)
4197 args = { 'peer': path, 'wps_method': 'keypad',
4198 'go_intent': 15, 'authorize_only': True }
4199 try:
4200 p2p.Connect(args)
4201 raise Exception("Invalid Connect accepted")
4202 except dbus.exceptions.DBusException, e:
4203 if "InvalidArgs" not in str(e):
4204 raise Exception("Unexpected error message for invalid Connect: " + str(e))
4205
4206 args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4207 'go_intent': 15, 'authorize_only': True }
4208 p2p.Connect(args)
4209 p2p.Listen(10)
4210 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4211 if not dev1.discover_peer(addr0):
4212 raise Exception("Peer not found")
4213 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=0")
bc6e3288 4214 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
2ec82e67
JM
4215 if ev is None:
4216 raise Exception("Group formation timed out")
cb346b49 4217 self.sta_group_ev = ev
2ec82e67
JM
4218
4219 def goNegotiationSuccess(self, properties):
4220 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4221
4222 def groupStarted(self, properties):
4223 logger.debug("groupStarted: " + str(properties))
cb346b49
JM
4224 self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4225 properties['interface_object'])
2ec82e67 4226 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
cb346b49 4227 dev1.group_form_result(self.sta_group_ev)
2ec82e67
JM
4228 dev1.remove_group()
4229
4230 def staDeauthorized(self, name):
4231 logger.debug("staDeuthorized: " + name)
cb346b49
JM
4232 group_p2p = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4233 group_p2p.Disconnect()
2ec82e67
JM
4234
4235 def peerJoined(self, peer):
4236 logger.debug("peerJoined: " + peer)
4237 self.peer_joined = True
4238
4239 def peerDisconnected(self, peer):
4240 logger.debug("peerDisconnected: " + peer)
4241 self.peer_disconnected = True
4242
4243 def groupFinished(self, properties):
4244 logger.debug("groupFinished: " + str(properties))
4245 self.done = True
4246 self.loop.quit()
4247
4248 def run_test(self, *args):
4249 logger.debug("run_test")
4250 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4251 return False
4252
4253 def success(self):
4254 return self.done and self.peer_joined and self.peer_disconnected
4255
4256 with TestDbusP2p(bus) as t:
4257 if not t.success():
4258 raise Exception("Expected signals not seen")
4259
4260def test_dbus_p2p_go_neg_init(dev, apdev):
4261 """D-Bus P2P GO Negotiation initiation"""
910eb5b5 4262 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
4263 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4264 addr0 = dev[0].p2p_dev_addr()
4265 dev[1].p2p_listen()
4266
4267 class TestDbusP2p(TestDbus):
4268 def __init__(self, bus):
4269 TestDbus.__init__(self, bus)
4270 self.done = False
20fd8def
JM
4271 self.peer_group_added = False
4272 self.peer_group_removed = False
2ec82e67
JM
4273
4274 def __enter__(self):
4275 gobject.timeout_add(1, self.run_test)
4276 gobject.timeout_add(15000, self.timeout)
4277 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4278 "DeviceFound")
4279 self.add_signal(self.goNegotiationSuccess,
4280 WPAS_DBUS_IFACE_P2PDEVICE,
4281 "GONegotiationSuccess",
4282 byte_arrays=True)
4283 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4284 "GroupStarted")
4285 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4286 "GroupFinished")
20fd8def
JM
4287 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4288 "PropertiesChanged")
2ec82e67
JM
4289 self.loop.run()
4290 return self
4291
4292 def deviceFound(self, path):
4293 logger.debug("deviceFound: path=%s" % path)
4294 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4295 args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4296 'go_intent': 0 }
4297 p2p.Connect(args)
4298
4299 ev = dev1.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15)
4300 if ev is None:
4301 raise Exception("Timeout while waiting for GO Neg Request")
4302 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15")
bc6e3288 4303 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
2ec82e67
JM
4304 if ev is None:
4305 raise Exception("Group formation timed out")
cb346b49 4306 self.sta_group_ev = ev
2ec82e67
JM
4307
4308 def goNegotiationSuccess(self, properties):
4309 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4310
4311 def groupStarted(self, properties):
4312 logger.debug("groupStarted: " + str(properties))
cb346b49
JM
4313 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4314 properties['interface_object'])
4315 group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4316 group_p2p.Disconnect()
2ec82e67 4317 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
cb346b49 4318 dev1.group_form_result(self.sta_group_ev)
2ec82e67
JM
4319 dev1.remove_group()
4320
4321 def groupFinished(self, properties):
4322 logger.debug("groupFinished: " + str(properties))
4323 self.done = True
20fd8def
JM
4324
4325 def propertiesChanged(self, interface_name, changed_properties,
4326 invalidated_properties):
4327 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
4328 if interface_name != WPAS_DBUS_P2P_PEER:
4329 return
4330 if "Groups" not in changed_properties:
4331 return
4332 if len(changed_properties["Groups"]) > 0:
4333 self.peer_group_added = True
4334 if len(changed_properties["Groups"]) == 0:
4335 self.peer_group_removed = True
4336 self.loop.quit()
2ec82e67
JM
4337
4338 def run_test(self, *args):
4339 logger.debug("run_test")
4340 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4341 return False
4342
4343 def success(self):
20fd8def
JM
4344 return self.done and self.peer_group_added and self.peer_group_removed
4345
4346 with TestDbusP2p(bus) as t:
4347 if not t.success():
4348 raise Exception("Expected signals not seen")
4349
4350def test_dbus_p2p_group_termination_by_go(dev, apdev):
4351 """D-Bus P2P group removal on GO terminating the group"""
4352 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4353 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4354 addr0 = dev[0].p2p_dev_addr()
4355 dev[1].p2p_listen()
4356
4357 class TestDbusP2p(TestDbus):
4358 def __init__(self, bus):
4359 TestDbus.__init__(self, bus)
4360 self.done = False
4361 self.peer_group_added = False
4362 self.peer_group_removed = False
4363
4364 def __enter__(self):
4365 gobject.timeout_add(1, self.run_test)
4366 gobject.timeout_add(15000, self.timeout)
4367 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4368 "DeviceFound")
4369 self.add_signal(self.goNegotiationSuccess,
4370 WPAS_DBUS_IFACE_P2PDEVICE,
4371 "GONegotiationSuccess",
4372 byte_arrays=True)
4373 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4374 "GroupStarted")
4375 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4376 "GroupFinished")
4377 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4378 "PropertiesChanged")
4379 self.loop.run()
4380 return self
4381
4382 def deviceFound(self, path):
4383 logger.debug("deviceFound: path=%s" % path)
4384 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4385 args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4386 'go_intent': 0 }
4387 p2p.Connect(args)
4388
4389 ev = dev1.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15)
4390 if ev is None:
4391 raise Exception("Timeout while waiting for GO Neg Request")
4392 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15")
bc6e3288 4393 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
20fd8def
JM
4394 if ev is None:
4395 raise Exception("Group formation timed out")
4396 self.sta_group_ev = ev
4397
4398 def goNegotiationSuccess(self, properties):
4399 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4400
4401 def groupStarted(self, properties):
4402 logger.debug("groupStarted: " + str(properties))
4403 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4404 properties['interface_object'])
4405 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4406 dev1.group_form_result(self.sta_group_ev)
4407 dev1.remove_group()
4408
4409 def groupFinished(self, properties):
4410 logger.debug("groupFinished: " + str(properties))
4411 self.done = True
4412
4413 def propertiesChanged(self, interface_name, changed_properties,
4414 invalidated_properties):
4415 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
4416 if interface_name != WPAS_DBUS_P2P_PEER:
4417 return
4418 if "Groups" not in changed_properties:
4419 return
4420 if len(changed_properties["Groups"]) > 0:
4421 self.peer_group_added = True
6fad40df 4422 if len(changed_properties["Groups"]) == 0 and self.peer_group_added:
20fd8def
JM
4423 self.peer_group_removed = True
4424 self.loop.quit()
4425
4426 def run_test(self, *args):
4427 logger.debug("run_test")
4428 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4429 return False
4430
4431 def success(self):
4432 return self.done and self.peer_group_added and self.peer_group_removed
4433
4434 with TestDbusP2p(bus) as t:
4435 if not t.success():
4436 raise Exception("Expected signals not seen")
4437
4438def test_dbus_p2p_group_idle_timeout(dev, apdev):
4439 """D-Bus P2P group removal on idle timeout"""
4440 try:
4441 dev[0].global_request("SET p2p_group_idle 1")
4442 _test_dbus_p2p_group_idle_timeout(dev, apdev)
4443 finally:
4444 dev[0].global_request("SET p2p_group_idle 0")
4445
4446def _test_dbus_p2p_group_idle_timeout(dev, apdev):
4447 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4448 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4449 addr0 = dev[0].p2p_dev_addr()
4450 dev[1].p2p_listen()
4451
4452 class TestDbusP2p(TestDbus):
4453 def __init__(self, bus):
4454 TestDbus.__init__(self, bus)
4455 self.done = False
845d48c1 4456 self.group_started = False
20fd8def
JM
4457 self.peer_group_added = False
4458 self.peer_group_removed = False
4459
4460 def __enter__(self):
4461 gobject.timeout_add(1, self.run_test)
4462 gobject.timeout_add(15000, self.timeout)
4463 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4464 "DeviceFound")
4465 self.add_signal(self.goNegotiationSuccess,
4466 WPAS_DBUS_IFACE_P2PDEVICE,
4467 "GONegotiationSuccess",
4468 byte_arrays=True)
4469 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4470 "GroupStarted")
4471 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4472 "GroupFinished")
4473 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4474 "PropertiesChanged")
4475 self.loop.run()
4476 return self
4477
4478 def deviceFound(self, path):
4479 logger.debug("deviceFound: path=%s" % path)
4480 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4481 args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4482 'go_intent': 0 }
4483 p2p.Connect(args)
4484
4485 ev = dev1.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15)
4486 if ev is None:
4487 raise Exception("Timeout while waiting for GO Neg Request")
4488 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15")
bc6e3288 4489 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
20fd8def
JM
4490 if ev is None:
4491 raise Exception("Group formation timed out")
4492 self.sta_group_ev = ev
4493
4494 def goNegotiationSuccess(self, properties):
4495 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4496
4497 def groupStarted(self, properties):
4498 logger.debug("groupStarted: " + str(properties))
845d48c1 4499 self.group_started = True
20fd8def
JM
4500 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4501 properties['interface_object'])
4502 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4503 dev1.group_form_result(self.sta_group_ev)
4504 ifaddr = dev1.group_request("STA-FIRST").splitlines()[0]
4505 # Force disassociation with different reason code so that the
4506 # P2P Client using D-Bus does not get normal group termination event
4507 # from the GO.
4508 dev1.group_request("DEAUTHENTICATE " + ifaddr + " reason=0 test=0")
4509 dev1.remove_group()
4510
4511 def groupFinished(self, properties):
4512 logger.debug("groupFinished: " + str(properties))
4513 self.done = True
4514
4515 def propertiesChanged(self, interface_name, changed_properties,
4516 invalidated_properties):
4517 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
4518 if interface_name != WPAS_DBUS_P2P_PEER:
4519 return
845d48c1
JM
4520 if not self.group_started:
4521 return
20fd8def
JM
4522 if "Groups" not in changed_properties:
4523 return
4524 if len(changed_properties["Groups"]) > 0:
4525 self.peer_group_added = True
4526 if len(changed_properties["Groups"]) == 0:
4527 self.peer_group_removed = True
4528 self.loop.quit()
4529
4530 def run_test(self, *args):
4531 logger.debug("run_test")
4532 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4533 return False
4534
4535 def success(self):
4536 return self.done and self.peer_group_added and self.peer_group_removed
2ec82e67
JM
4537
4538 with TestDbusP2p(bus) as t:
4539 if not t.success():
4540 raise Exception("Expected signals not seen")
4541
4542def test_dbus_p2p_wps_failure(dev, apdev):
4543 """D-Bus P2P WPS failure"""
910eb5b5 4544 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
4545 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4546 addr0 = dev[0].p2p_dev_addr()
4547
4548 class TestDbusP2p(TestDbus):
4549 def __init__(self, bus):
4550 TestDbus.__init__(self, bus)
084780f1
JM
4551 self.wps_failed = False
4552 self.formation_failure = False
2ec82e67
JM
4553
4554 def __enter__(self):
4555 gobject.timeout_add(1, self.run_test)
4556 gobject.timeout_add(15000, self.timeout)
4557 self.add_signal(self.goNegotiationRequest,
4558 WPAS_DBUS_IFACE_P2PDEVICE,
4559 "GONegotiationRequest",
4560 byte_arrays=True)
4561 self.add_signal(self.goNegotiationSuccess,
4562 WPAS_DBUS_IFACE_P2PDEVICE,
4563 "GONegotiationSuccess",
4564 byte_arrays=True)
4565 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4566 "GroupStarted")
4567 self.add_signal(self.wpsFailed, WPAS_DBUS_IFACE_P2PDEVICE,
4568 "WpsFailed")
084780f1
JM
4569 self.add_signal(self.groupFormationFailure,
4570 WPAS_DBUS_IFACE_P2PDEVICE,
4571 "GroupFormationFailure")
2ec82e67
JM
4572 self.loop.run()
4573 return self
4574
f5d5161d
JM
4575 def goNegotiationRequest(self, path, dev_passwd_id, go_intent=0):
4576 logger.debug("goNegotiationRequest: path=%s dev_passwd_id=%d go_intent=%d" % (path, dev_passwd_id, go_intent))
2ec82e67
JM
4577 if dev_passwd_id != 1:
4578 raise Exception("Unexpected dev_passwd_id=%d" % dev_passwd_id)
4579 args = { 'peer': path, 'wps_method': 'display', 'pin': '12345670',
4580 'go_intent': 15 }
4581 p2p.Connect(args)
4582
4583 def goNegotiationSuccess(self, properties):
4584 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4585
4586 def groupStarted(self, properties):
4587 logger.debug("groupStarted: " + str(properties))
4588 raise Exception("Unexpected GroupStarted")
4589
4590 def wpsFailed(self, name, args):
4591 logger.debug("wpsFailed - name=%s args=%s" % (name, str(args)))
084780f1
JM
4592 self.wps_failed = True
4593 if self.formation_failure:
4594 self.loop.quit()
4595
4596 def groupFormationFailure(self, reason):
4597 logger.debug("groupFormationFailure - reason=%s" % reason)
4598 self.formation_failure = True
4599 if self.wps_failed:
4600 self.loop.quit()
2ec82e67
JM
4601
4602 def run_test(self, *args):
4603 logger.debug("run_test")
4604 p2p.Listen(10)
4605 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4606 if not dev1.discover_peer(addr0):
4607 raise Exception("Peer not found")
4608 dev1.global_request("P2P_CONNECT " + addr0 + " 87654321 enter")
4609 return False
4610
4611 def success(self):
084780f1 4612 return self.wps_failed and self.formation_failure
2ec82e67
JM
4613
4614 with TestDbusP2p(bus) as t:
4615 if not t.success():
4616 raise Exception("Expected signals not seen")
4617
4618def test_dbus_p2p_two_groups(dev, apdev):
4619 """D-Bus P2P with two concurrent groups"""
910eb5b5 4620 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
4621 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4622
4623 dev[0].request("SET p2p_no_group_iface 0")
4624 addr0 = dev[0].p2p_dev_addr()
4625 addr1 = dev[1].p2p_dev_addr()
4626 addr2 = dev[2].p2p_dev_addr()
4627 dev[1].p2p_start_go(freq=2412)
cb346b49 4628 dev1_group_ifname = dev[1].group_ifname
2ec82e67
JM
4629
4630 class TestDbusP2p(TestDbus):
4631 def __init__(self, bus):
4632 TestDbus.__init__(self, bus)
4633 self.done = False
4634 self.peer = None
4635 self.go = None
4636 self.group1 = None
4637 self.group2 = None
5a217649 4638 self.groups_removed = False
2ec82e67
JM
4639
4640 def __enter__(self):
4641 gobject.timeout_add(1, self.run_test)
4642 gobject.timeout_add(15000, self.timeout)
4643 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4644 "PropertiesChanged", byte_arrays=True)
4645 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4646 "DeviceFound")
4647 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4648 "GroupStarted")
4649 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4650 "GroupFinished")
4651 self.add_signal(self.peerJoined, WPAS_DBUS_GROUP,
4652 "PeerJoined")
4653 self.loop.run()
4654 return self
4655
4656 def propertiesChanged(self, interface_name, changed_properties,
4657 invalidated_properties):
4658 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
4659
4660 def deviceFound(self, path):
4661 logger.debug("deviceFound: path=%s" % path)
4662 if addr2.replace(':','') in path:
4663 self.peer = path
4664 elif addr1.replace(':','') in path:
4665 self.go = path
4666 if self.go and not self.group1:
4667 logger.info("Join the group")
4668 p2p.StopFind()
4669 pin = '12345670'
4670 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
cb346b49
JM
4671 dev1.group_ifname = dev1_group_ifname
4672 dev1.group_request("WPS_PIN any " + pin)
2ec82e67
JM
4673 args = { 'peer': self.go,
4674 'join': True,
4675 'wps_method': 'pin',
4676 'pin': pin,
4677 'frequency': 2412 }
4678 p2p.Connect(args)
4679
4680 def groupStarted(self, properties):
4681 logger.debug("groupStarted: " + str(properties))
4682 prop = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE,
4683 dbus_interface=dbus.PROPERTIES_IFACE)
4684 logger.debug("p2pdevice properties: " + str(prop))
4685
4686 g_obj = bus.get_object(WPAS_DBUS_SERVICE,
4687 properties['group_object'])
4688 res = g_obj.GetAll(WPAS_DBUS_GROUP,
4689 dbus_interface=dbus.PROPERTIES_IFACE,
4690 byte_arrays=True)
4691 logger.debug("Group properties: " + str(res))
4692
4693 if not self.group1:
4694 self.group1 = properties['group_object']
4695 self.group1iface = properties['interface_object']
4696 self.g1_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4697 self.group1iface)
4698
4699 logger.info("Start autonomous GO")
4700 params = dbus.Dictionary({ 'frequency': 2412 })
4701 p2p.GroupAdd(params)
4702 elif not self.group2:
4703 self.group2 = properties['group_object']
4704 self.group2iface = properties['interface_object']
4705 self.g2_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4706 self.group2iface)
4707 self.g2_bssid = res['BSSID']
4708
4709 if self.group1 and self.group2:
4710 logger.info("Authorize peer to join the group")
4711 a2 = binascii.unhexlify(addr2.replace(':',''))
4712 params = { 'Role': 'enrollee',
4713 'P2PDeviceAddress': dbus.ByteArray(a2),
4714 'Bssid': dbus.ByteArray(a2),
4715 'Type': 'pin',
4716 'Pin': '12345670' }
4717 g_wps = dbus.Interface(self.g2_if_obj, WPAS_DBUS_IFACE_WPS)
4718 g_wps.Start(params)
4719
4720 bssid = ':'.join([binascii.hexlify(l) for l in self.g2_bssid])
4721 dev2 = WpaSupplicant('wlan2', '/tmp/wpas-wlan2')
4722 dev2.scan_for_bss(bssid, freq=2412)
4723 dev2.global_request("P2P_CONNECT " + bssid + " 12345670 join freq=2412")
bc6e3288 4724 ev = dev2.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
cb346b49
JM
4725 if ev is None:
4726 raise Exception("Group join timed out")
4727 self.dev2_group_ev = ev
2ec82e67
JM
4728
4729 def groupFinished(self, properties):
4730 logger.debug("groupFinished: " + str(properties))
4731
4732 if self.group1 == properties['group_object']:
4733 self.group1 = None
4734 elif self.group2 == properties['group_object']:
4735 self.group2 = None
4736
4737 if not self.group1 and not self.group2:
4738 self.done = True
4739 self.loop.quit()
4740
4741 def peerJoined(self, peer):
4742 logger.debug("peerJoined: " + peer)
5a217649
JM
4743 if self.groups_removed:
4744 return
2ec82e67
JM
4745 self.check_results()
4746
4747 dev2 = WpaSupplicant('wlan2', '/tmp/wpas-wlan2')
cb346b49 4748 dev2.group_form_result(self.dev2_group_ev)
2ec82e67
JM
4749 dev2.remove_group()
4750
4751 logger.info("Disconnect group2")
4752 group_p2p = dbus.Interface(self.g2_if_obj,
4753 WPAS_DBUS_IFACE_P2PDEVICE)
4754 group_p2p.Disconnect()
4755
4756 logger.info("Disconnect group1")
4757 group_p2p = dbus.Interface(self.g1_if_obj,
4758 WPAS_DBUS_IFACE_P2PDEVICE)
4759 group_p2p.Disconnect()
5a217649 4760 self.groups_removed = True
2ec82e67
JM
4761
4762 def check_results(self):
4763 logger.info("Check results with two concurrent groups in operation")
4764
4765 g1_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group1)
4766 res1 = g1_obj.GetAll(WPAS_DBUS_GROUP,
4767 dbus_interface=dbus.PROPERTIES_IFACE,
4768 byte_arrays=True)
4769
4770 g2_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group2)
4771 res2 = g2_obj.GetAll(WPAS_DBUS_GROUP,
4772 dbus_interface=dbus.PROPERTIES_IFACE,
4773 byte_arrays=True)
4774
4775 logger.info("group1 = " + self.group1)
4776 logger.debug("Group properties: " + str(res1))
4777
4778 logger.info("group2 = " + self.group2)
4779 logger.debug("Group properties: " + str(res2))
4780
4781 prop = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE,
4782 dbus_interface=dbus.PROPERTIES_IFACE)
4783 logger.debug("p2pdevice properties: " + str(prop))
4784
4785 if res1['Role'] != 'client':
4786 raise Exception("Group1 role reported incorrectly: " + res1['Role'])
4787 if res2['Role'] != 'GO':
4788 raise Exception("Group2 role reported incorrectly: " + res2['Role'])
4789 if prop['Role'] != 'device':
4790 raise Exception("p2pdevice role reported incorrectly: " + prop['Role'])
4791
4792 if len(res2['Members']) != 1:
4793 raise Exception("Unexpected Members value for group 2")
4794
4795 def run_test(self, *args):
4796 logger.debug("run_test")
4797 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4798 return False
4799
4800 def success(self):
4801 return self.done
4802
4803 with TestDbusP2p(bus) as t:
4804 if not t.success():
4805 raise Exception("Expected signals not seen")
4806
4807 dev[1].remove_group()
0e126c6d 4808
f572ae80
JM
4809def test_dbus_p2p_cancel(dev, apdev):
4810 """D-Bus P2P Cancel"""
4811 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4812 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4813 try:
4814 p2p.Cancel()
4815 raise Exception("Unexpected p2p.Cancel() success")
4816 except dbus.exceptions.DBusException, e:
4817 pass
4818
4819 addr0 = dev[0].p2p_dev_addr()
4820 dev[1].p2p_listen()
4821
4822 class TestDbusP2p(TestDbus):
4823 def __init__(self, bus):
4824 TestDbus.__init__(self, bus)
4825 self.done = False
4826
4827 def __enter__(self):
4828 gobject.timeout_add(1, self.run_test)
4829 gobject.timeout_add(15000, self.timeout)
4830 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4831 "DeviceFound")
4832 self.loop.run()
4833 return self
4834
4835 def deviceFound(self, path):
4836 logger.debug("deviceFound: path=%s" % path)
4837 args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4838 'go_intent': 0 }
4839 p2p.Connect(args)
4840 p2p.Cancel()
4841 self.done = True
4842 self.loop.quit()
4843
4844 def run_test(self, *args):
4845 logger.debug("run_test")
4846 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4847 return False
4848
4849 def success(self):
4850 return self.done
4851
4852 with TestDbusP2p(bus) as t:
4853 if not t.success():
4854 raise Exception("Expected signals not seen")
4855
0e126c6d
JM
4856def test_dbus_introspect(dev, apdev):
4857 """D-Bus introspection"""
4858 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4859
4860 res = if_obj.Introspect(WPAS_DBUS_IFACE,
4861 dbus_interface=dbus.INTROSPECTABLE_IFACE)
4862 logger.info("Initial Introspect: " + str(res))
4863 if res is None or "Introspectable" not in res or "GroupStarted" not in res:
4864 raise Exception("Unexpected initial Introspect response: " + str(res))
f0ef6889
DW
4865 if "FastReauth" not in res or "PassiveScan" not in res:
4866 raise Exception("Unexpected initial Introspect response: " + str(res))
0e126c6d
JM
4867
4868 with alloc_fail(dev[0], 1, "wpa_dbus_introspect"):
4869 res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
4870 dbus_interface=dbus.INTROSPECTABLE_IFACE)
4871 logger.info("Introspect: " + str(res2))
4872 if res2 is not None:
4873 raise Exception("Unexpected Introspect response")
4874
4875 with alloc_fail(dev[0], 1, "=add_interface;wpa_dbus_introspect"):
4876 res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
4877 dbus_interface=dbus.INTROSPECTABLE_IFACE)
4878 logger.info("Introspect: " + str(res2))
4879 if res2 is None:
4880 raise Exception("No Introspect response")
4881 if len(res2) >= len(res):
4882 raise Exception("Unexpected Introspect response")
4883
4884 with alloc_fail(dev[0], 1, "wpabuf_alloc;add_interface;wpa_dbus_introspect"):
4885 res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
4886 dbus_interface=dbus.INTROSPECTABLE_IFACE)
4887 logger.info("Introspect: " + str(res2))
4888 if res2 is None:
4889 raise Exception("No Introspect response")
4890 if len(res2) >= len(res):
4891 raise Exception("Unexpected Introspect response")
4892
4893 with alloc_fail(dev[0], 2, "=add_interface;wpa_dbus_introspect"):
4894 res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
4895 dbus_interface=dbus.INTROSPECTABLE_IFACE)
4896 logger.info("Introspect: " + str(res2))
4897 if res2 is None:
4898 raise Exception("No Introspect response")
4899 if len(res2) >= len(res):
4900 raise Exception("Unexpected Introspect response")
9bbce257
JM
4901
4902def test_dbus_ap(dev, apdev):
4903 """D-Bus AddNetwork for AP mode"""
4904 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4905 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
4906
4907 ssid = "test-wpa2-psk"
4908 passphrase = 'qwertyuiop'
4909
4910 class TestDbusConnect(TestDbus):
4911 def __init__(self, bus):
4912 TestDbus.__init__(self, bus)
4913 self.started = False
4914
4915 def __enter__(self):
4916 gobject.timeout_add(1, self.run_connect)
4917 gobject.timeout_add(15000, self.timeout)
4918 self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
4919 self.add_signal(self.networkSelected, WPAS_DBUS_IFACE,
4920 "NetworkSelected")
4921 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
4922 "PropertiesChanged")
4923 self.loop.run()
4924 return self
4925
4926 def networkAdded(self, network, properties):
4927 logger.debug("networkAdded: %s" % str(network))
4928 logger.debug(str(properties))
4929
4930 def networkSelected(self, network):
4931 logger.debug("networkSelected: %s" % str(network))
4932 self.network_selected = True
4933
4934 def propertiesChanged(self, properties):
4935 logger.debug("propertiesChanged: %s" % str(properties))
4936 if 'State' in properties and properties['State'] == "completed":
4937 self.started = True
4938 self.loop.quit()
4939
4940 def run_connect(self, *args):
4941 logger.debug("run_connect")
4942 args = dbus.Dictionary({ 'ssid': ssid,
4943 'key_mgmt': 'WPA-PSK',
4944 'psk': passphrase,
4945 'mode': 2,
4946 'frequency': 2412 },
4947 signature='sv')
4948 self.netw = iface.AddNetwork(args)
4949 iface.SelectNetwork(self.netw)
4950 return False
4951
4952 def success(self):
4953 return self.started
4954
4955 with TestDbusConnect(bus) as t:
4956 if not t.success():
4957 raise Exception("Expected signals not seen")
4958 dev[1].connect(ssid, psk=passphrase, scan_freq="2412")
d9e2a057
JM
4959
4960def test_dbus_connect_wpa_eap(dev, apdev):
4961 """D-Bus AddNetwork and connection with WPA+WPA2-Enterprise AP"""
4962 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4963 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
4964
4965 ssid = "test-wpa-eap"
4966 params = hostapd.wpa_eap_params(ssid=ssid)
4967 params["wpa"] = "3"
4968 params["rsn_pairwise"] = "CCMP"
8b8a1864 4969 hapd = hostapd.add_ap(apdev[0], params)
d9e2a057
JM
4970
4971 class TestDbusConnect(TestDbus):
4972 def __init__(self, bus):
4973 TestDbus.__init__(self, bus)
4974 self.done = False
4975
4976 def __enter__(self):
4977 gobject.timeout_add(1, self.run_connect)
4978 gobject.timeout_add(15000, self.timeout)
4979 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
4980 "PropertiesChanged")
4981 self.add_signal(self.eap, WPAS_DBUS_IFACE, "EAP")
4982 self.loop.run()
4983 return self
4984
4985 def propertiesChanged(self, properties):
4986 logger.debug("propertiesChanged: %s" % str(properties))
4987 if 'State' in properties and properties['State'] == "completed":
4988 self.done = True
4989 self.loop.quit()
4990
4991 def eap(self, status, parameter):
4992 logger.debug("EAP: status=%s parameter=%s" % (status, parameter))
4993
4994 def run_connect(self, *args):
4995 logger.debug("run_connect")
4996 args = dbus.Dictionary({ 'ssid': ssid,
4997 'key_mgmt': 'WPA-EAP',
4998 'eap': 'PEAP',
4999 'identity': 'user',
5000 'password': 'password',
5001 'ca_cert': 'auth_serv/ca.pem',
5002 'phase2': 'auth=MSCHAPV2',
5003 'scan_freq': 2412 },
5004 signature='sv')
5005 self.netw = iface.AddNetwork(args)
5006 iface.SelectNetwork(self.netw)
5007 return False
5008
5009 def success(self):
5010 return self.done
5011
5012 with TestDbusConnect(bus) as t:
5013 if not t.success():
5014 raise Exception("Expected signals not seen")
708ec753
JM
5015
5016def test_dbus_ap_scan_2_ap_mode_scan(dev, apdev):
5017 """AP_SCAN 2 AP mode and D-Bus Scan()"""
5018 try:
5019 _test_dbus_ap_scan_2_ap_mode_scan(dev, apdev)
5020 finally:
5021 dev[0].request("AP_SCAN 1")
5022
5023def _test_dbus_ap_scan_2_ap_mode_scan(dev, apdev):
5024 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5025 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5026
5027 if "OK" not in dev[0].request("AP_SCAN 2"):
5028 raise Exception("Failed to set AP_SCAN 2")
5029
5030 id = dev[0].add_network()
5031 dev[0].set_network(id, "mode", "2")
5032 dev[0].set_network_quoted(id, "ssid", "wpas-ap-open")
5033 dev[0].set_network(id, "key_mgmt", "NONE")
5034 dev[0].set_network(id, "frequency", "2412")
5035 dev[0].set_network(id, "scan_freq", "2412")
5036 dev[0].set_network(id, "disabled", "0")
5037 dev[0].select_network(id)
5038 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=5)
5039 if ev is None:
5040 raise Exception("AP failed to start")
5041
5042 with fail_test(dev[0], 1, "wpa_driver_nl80211_scan"):
5043 iface.Scan({'Type': 'active',
5044 'AllowRoam': True,
5045 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
5046 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-FAILED",
5047 "AP-DISABLED"], timeout=5)
5048 if ev is None:
5049 raise Exception("CTRL-EVENT-SCAN-FAILED not seen")
5050 if "AP-DISABLED" in ev:
5051 raise Exception("Unexpected AP-DISABLED event")
5052 if "retry=1" in ev:
5053 # Wait for the retry to scan happen
5054 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-FAILED",
5055 "AP-DISABLED"], timeout=5)
5056 if ev is None:
5057 raise Exception("CTRL-EVENT-SCAN-FAILED not seen - retry")
5058 if "AP-DISABLED" in ev:
5059 raise Exception("Unexpected AP-DISABLED event - retry")
5060
5061 dev[1].connect("wpas-ap-open", key_mgmt="NONE", scan_freq="2412")
5062 dev[1].request("DISCONNECT")
5063 dev[1].wait_disconnected()
5064 dev[0].request("DISCONNECT")
5065 dev[0].wait_disconnected()
d679ab74
JM
5066
5067def test_dbus_expectdisconnect(dev, apdev):
5068 """D-Bus ExpectDisconnect"""
5069 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5070 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
5071
5072 params = { "ssid": "test-open" }
8b8a1864 5073 hapd = hostapd.add_ap(apdev[0], params)
d679ab74
JM
5074 dev[0].connect("test-open", key_mgmt="NONE", scan_freq="2412")
5075
5076 # This does not really verify the behavior other than by going through the
5077 # code path for additional coverage.
5078 wpas.ExpectDisconnect()
5079 dev[0].request("DISCONNECT")
5080 dev[0].wait_disconnected()
2299b543
JM
5081
5082def test_dbus_save_config(dev, apdev):
5083 """D-Bus SaveConfig"""
5084 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5085 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5086 try:
5087 iface.SaveConfig()
5088 raise Exception("SaveConfig() accepted unexpectedly")
5089 except dbus.exceptions.DBusException, e:
5090 if not str(e).startswith("fi.w1.wpa_supplicant1.UnknownError: Not allowed to update configuration"):
5091 raise Exception("Unexpected error message for SaveConfig(): " + str(e))
ce96e65c
JM
5092
5093def test_dbus_vendor_elem(dev, apdev):
5094 """D-Bus vendor element operations"""
5095 try:
5096 _test_dbus_vendor_elem(dev, apdev)
5097 finally:
5098 dev[0].request("VENDOR_ELEM_REMOVE 1 *")
5099
5100def _test_dbus_vendor_elem(dev, apdev):
5101 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5102 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5103
5104 dev[0].request("VENDOR_ELEM_REMOVE 1 *")
5105
5106 try:
5107 ie = dbus.ByteArray("\x00\x00")
5108 iface.VendorElemAdd(-1, ie)
5109 raise Exception("Invalid VendorElemAdd() accepted")
5110 except dbus.exceptions.DBusException, e:
5111 if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e):
5112 raise Exception("Unexpected error message for invalid VendorElemAdd[1]: " + str(e))
5113
5114 try:
5115 ie = dbus.ByteArray("")
5116 iface.VendorElemAdd(1, ie)
5117 raise Exception("Invalid VendorElemAdd() accepted")
5118 except dbus.exceptions.DBusException, e:
5119 if "InvalidArgs" not in str(e) or "Invalid value" not in str(e):
5120 raise Exception("Unexpected error message for invalid VendorElemAdd[2]: " + str(e))
5121
5122 try:
5123 ie = dbus.ByteArray("\x00\x01")
5124 iface.VendorElemAdd(1, ie)
5125 raise Exception("Invalid VendorElemAdd() accepted")
5126 except dbus.exceptions.DBusException, e:
5127 if "InvalidArgs" not in str(e) or "Parse error" not in str(e):
5128 raise Exception("Unexpected error message for invalid VendorElemAdd[3]: " + str(e))
5129
5130 try:
5131 iface.VendorElemGet(-1)
5132 raise Exception("Invalid VendorElemGet() accepted")
5133 except dbus.exceptions.DBusException, e:
5134 if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e):
5135 raise Exception("Unexpected error message for invalid VendorElemGet[1]: " + str(e))
5136
5137 try:
5138 iface.VendorElemGet(1)
5139 raise Exception("Invalid VendorElemGet() accepted")
5140 except dbus.exceptions.DBusException, e:
5141 if "InvalidArgs" not in str(e) or "ID value does not exist" not in str(e):
5142 raise Exception("Unexpected error message for invalid VendorElemGet[2]: " + str(e))
5143
5144 try:
5145 ie = dbus.ByteArray("\x00\x00")
5146 iface.VendorElemRem(-1, ie)
5147 raise Exception("Invalid VendorElemRemove() accepted")
5148 except dbus.exceptions.DBusException, e:
5149 if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e):
5150 raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e))
5151
5152 try:
5153 ie = dbus.ByteArray("")
5154 iface.VendorElemRem(1, ie)
5155 raise Exception("Invalid VendorElemRemove() accepted")
5156 except dbus.exceptions.DBusException, e:
5157 if "InvalidArgs" not in str(e) or "Invalid value" not in str(e):
5158 raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e))
5159
5160 iface.VendorElemRem(1, "*")
5161
5162 ie = dbus.ByteArray("\x00\x01\x00")
5163 iface.VendorElemAdd(1, ie)
5164
5165 val = iface.VendorElemGet(1)
5166 if len(val) != len(ie):
5167 raise Exception("Unexpected VendorElemGet length")
5168 for i in range(len(val)):
5169 if val[i] != dbus.Byte(ie[i]):
5170 raise Exception("Unexpected VendorElemGet data")
5171
5172 ie2 = dbus.ByteArray("\xe0\x00")
5173 iface.VendorElemAdd(1, ie2)
5174
5175 ies = ie + ie2
5176 val = iface.VendorElemGet(1)
5177 if len(val) != len(ies):
5178 raise Exception("Unexpected VendorElemGet length[2]")
5179 for i in range(len(val)):
5180 if val[i] != dbus.Byte(ies[i]):
5181 raise Exception("Unexpected VendorElemGet data[2]")
5182
5183 try:
5184 test_ie = dbus.ByteArray("\x01\x01")
5185 iface.VendorElemRem(1, test_ie)
5186 raise Exception("Invalid VendorElemRemove() accepted")
5187 except dbus.exceptions.DBusException, e:
5188 if "InvalidArgs" not in str(e) or "Parse error" not in str(e):
5189 raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e))
5190
5191 iface.VendorElemRem(1, ie)
5192 val = iface.VendorElemGet(1)
5193 if len(val) != len(ie2):
5194 raise Exception("Unexpected VendorElemGet length[3]")
5195
5196 iface.VendorElemRem(1, "*")
5197 try:
5198 iface.VendorElemGet(1)
5199 raise Exception("Invalid VendorElemGet() accepted after removal")
5200 except dbus.exceptions.DBusException, e:
5201 if "InvalidArgs" not in str(e) or "ID value does not exist" not in str(e):
5202 raise Exception("Unexpected error message for invalid VendorElemGet after removal: " + str(e))
dbd183c7
JM
5203
5204def test_dbus_assoc_reject(dev, apdev):
5205 """D-Bus AssocStatusCode"""
5206 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5207 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5208
5209 ssid = "test-open"
5210 params = { "ssid": ssid,
5211 "max_listen_interval": "1" }
8b8a1864 5212 hapd = hostapd.add_ap(apdev[0], params)
dbd183c7
JM
5213
5214 class TestDbusConnect(TestDbus):
5215 def __init__(self, bus):
5216 TestDbus.__init__(self, bus)
5217 self.assoc_status_seen = False
5218 self.state = 0
5219
5220 def __enter__(self):
5221 gobject.timeout_add(1, self.run_connect)
5222 gobject.timeout_add(15000, self.timeout)
5223 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
5224 "PropertiesChanged")
5225 self.loop.run()
5226 return self
5227
5228 def propertiesChanged(self, properties):
5229 logger.debug("propertiesChanged: %s" % str(properties))
5230 if 'AssocStatusCode' in properties:
5231 status = properties['AssocStatusCode']
5232 if status != 51:
5233 logger.info("Unexpected status code: " + str(status))
5234 else:
5235 self.assoc_status_seen = True
5236 iface.Disconnect()
5237 self.loop.quit()
5238
5239 def run_connect(self, *args):
5240 args = dbus.Dictionary({ 'ssid': ssid,
5241 'key_mgmt': 'NONE',
5242 'scan_freq': 2412 },
5243 signature='sv')
5244 self.netw = iface.AddNetwork(args)
5245 iface.SelectNetwork(self.netw)
5246 return False
5247
5248 def success(self):
5249 return self.assoc_status_seen
5250
5251 with TestDbusConnect(bus) as t:
5252 if not t.success():
5253 raise Exception("Expected signals not seen")