]> git.ipfire.org Git - thirdparty/hostap.git/blame - tests/hwsim/test_dbus.py
dbus: Add a debug print on fill_dict_with_properties() getter failures
[thirdparty/hostap.git] / tests / hwsim / test_dbus.py
CommitLineData
2ec82e67 1# wpa_supplicant D-Bus interface tests
910eb5b5 2# Copyright (c) 2014-2015, Jouni Malinen <j@w1.fi>
2ec82e67
JM
3#
4# This software may be distributed under the terms of the BSD license.
5# See README for more details.
6
7import binascii
2ec82e67
JM
8import logging
9logger = logging.getLogger()
10import subprocess
11import time
12
910eb5b5 13try:
b4de353c 14 import gobject
910eb5b5
JM
15 import dbus
16 dbus_imported = True
17except ImportError:
18 dbus_imported = False
19
2ec82e67
JM
20import hostapd
21from wpasupplicant import WpaSupplicant
0e126c6d 22from utils import HwsimSkip, alloc_fail
2ec82e67
JM
23from test_ap_tdls import connect_2sta_open
24
25WPAS_DBUS_SERVICE = "fi.w1.wpa_supplicant1"
26WPAS_DBUS_PATH = "/fi/w1/wpa_supplicant1"
27WPAS_DBUS_IFACE = "fi.w1.wpa_supplicant1.Interface"
28WPAS_DBUS_IFACE_WPS = WPAS_DBUS_IFACE + ".WPS"
29WPAS_DBUS_NETWORK = "fi.w1.wpa_supplicant1.Network"
30WPAS_DBUS_BSS = "fi.w1.wpa_supplicant1.BSS"
31WPAS_DBUS_IFACE_P2PDEVICE = WPAS_DBUS_IFACE + ".P2PDevice"
32WPAS_DBUS_P2P_PEER = "fi.w1.wpa_supplicant1.Peer"
33WPAS_DBUS_GROUP = "fi.w1.wpa_supplicant1.Group"
34WPAS_DBUS_PERSISTENT_GROUP = "fi.w1.wpa_supplicant1.PersistentGroup"
35
36def prepare_dbus(dev):
910eb5b5
JM
37 if not dbus_imported:
38 logger.info("No dbus module available")
51c5aeb4 39 raise HwsimSkip("No dbus module available")
2ec82e67 40 try:
2ec82e67
JM
41 from dbus.mainloop.glib import DBusGMainLoop
42 dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
43 bus = dbus.SystemBus()
44 wpas_obj = bus.get_object(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH)
45 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
46 path = wpas.GetInterface(dev.ifname)
47 if_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
910eb5b5 48 return (bus,wpas_obj,path,if_obj)
2ec82e67 49 except Exception, e:
51c5aeb4 50 raise HwsimSkip("Could not connect to D-Bus: %s" % e)
2ec82e67
JM
51
52class TestDbus(object):
53 def __init__(self, bus):
54 self.loop = gobject.MainLoop()
55 self.signals = []
56 self.bus = bus
57
58 def __exit__(self, type, value, traceback):
59 for s in self.signals:
60 s.remove()
61
62 def add_signal(self, handler, interface, name, byte_arrays=False):
63 s = self.bus.add_signal_receiver(handler, dbus_interface=interface,
64 signal_name=name,
65 byte_arrays=byte_arrays)
66 self.signals.append(s)
67
68 def timeout(self, *args):
69 logger.debug("timeout")
70 self.loop.quit()
71 return False
72
0e126c6d
JM
73class alloc_fail_dbus(object):
74 def __init__(self, dev, count, funcs, operation="Operation",
75 expected="NoMemory"):
76 self._dev = dev
77 self._count = count
78 self._funcs = funcs
79 self._operation = operation
80 self._expected = expected
81 def __enter__(self):
82 cmd = "TEST_ALLOC_FAIL %d:%s" % (self._count, self._funcs)
83 if "OK" not in self._dev.request(cmd):
84 raise HwsimSkip("TEST_ALLOC_FAIL not supported")
85 def __exit__(self, type, value, traceback):
86 if type is None:
87 raise Exception("%s succeeded during out-of-memory" % self._operation)
88 if type == dbus.exceptions.DBusException and self._expected in str(value):
89 return True
90 if self._dev.request("GET_ALLOC_FAIL") != "0:%s" % self._funcs:
91 raise Exception("%s did not trigger allocation failure" % self._operation)
92 return False
93
2ec82e67
JM
94def start_ap(ap):
95 ssid = "test-wps"
96 params = { "ssid": ssid, "eap_server": "1", "wps_state": "2",
97 "wpa_passphrase": "12345678", "wpa": "2",
98 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
99 "ap_pin": "12345670"}
100 return hostapd.add_ap(ap['ifname'], params)
101
102def test_dbus_getall(dev, apdev):
103 """D-Bus GetAll"""
910eb5b5 104 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
105
106 props = wpas_obj.GetAll(WPAS_DBUS_SERVICE,
107 dbus_interface=dbus.PROPERTIES_IFACE)
108 logger.debug("GetAll(fi.w1.wpa.supplicant1, /fi/w1/wpa_supplicant1) ==> " + str(props))
109
110 props = if_obj.GetAll(WPAS_DBUS_IFACE,
111 dbus_interface=dbus.PROPERTIES_IFACE)
112 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_IFACE, path, str(props)))
113
114 props = if_obj.GetAll(WPAS_DBUS_IFACE_WPS,
115 dbus_interface=dbus.PROPERTIES_IFACE)
116 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_IFACE_WPS, path, str(props)))
117
118 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
119 dbus_interface=dbus.PROPERTIES_IFACE)
120 if len(res) != 0:
121 raise Exception("Unexpected BSSs entry: " + str(res))
122
123 res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
124 dbus_interface=dbus.PROPERTIES_IFACE)
125 if len(res) != 0:
126 raise Exception("Unexpected Networks entry: " + str(res))
127
128 hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "open" })
129 bssid = apdev[0]['bssid']
130 dev[0].scan_for_bss(bssid, freq=2412)
131 id = dev[0].add_network()
132 dev[0].set_network(id, "disabled", "0")
133 dev[0].set_network_quoted(id, "ssid", "test")
134
135 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
136 dbus_interface=dbus.PROPERTIES_IFACE)
137 if len(res) != 1:
138 raise Exception("Missing BSSs entry: " + str(res))
139 bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
140 props = bss_obj.GetAll(WPAS_DBUS_BSS, dbus_interface=dbus.PROPERTIES_IFACE)
141 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_BSS, res[0], str(props)))
142 bssid_str = ''
143 for item in props['BSSID']:
144 if len(bssid_str) > 0:
145 bssid_str += ':'
146 bssid_str += '%02x' % item
147 if bssid_str != bssid:
148 raise Exception("Unexpected BSSID in BSSs entry")
149
150 res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
151 dbus_interface=dbus.PROPERTIES_IFACE)
152 if len(res) != 1:
153 raise Exception("Missing Networks entry: " + str(res))
154 net_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
155 props = net_obj.GetAll(WPAS_DBUS_NETWORK,
156 dbus_interface=dbus.PROPERTIES_IFACE)
157 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_NETWORK, res[0], str(props)))
158 ssid = props['Properties']['ssid']
159 if ssid != '"test"':
160 raise Exception("Unexpected SSID in network entry")
161
162def dbus_get(dbus, wpas_obj, prop, expect=None, byte_arrays=False):
163 val = wpas_obj.Get(WPAS_DBUS_SERVICE, prop,
164 dbus_interface=dbus.PROPERTIES_IFACE,
165 byte_arrays=byte_arrays)
166 if expect is not None and val != expect:
167 raise Exception("Unexpected %s: %s (expected: %s)" %
168 (prop, str(val), str(expect)))
169 return val
170
171def dbus_set(dbus, wpas_obj, prop, val):
172 wpas_obj.Set(WPAS_DBUS_SERVICE, prop, val,
173 dbus_interface=dbus.PROPERTIES_IFACE)
174
175def test_dbus_properties(dev, apdev):
176 """D-Bus Get/Set fi.w1.wpa_supplicant1 properties"""
910eb5b5 177 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
178
179 dbus_get(dbus, wpas_obj, "DebugLevel", expect="msgdump")
180 dbus_set(dbus, wpas_obj, "DebugLevel", "debug")
181 dbus_get(dbus, wpas_obj, "DebugLevel", expect="debug")
182 for (val,err) in [ (3, "Error.Failed: wrong property type"),
183 ("foo", "Error.Failed: wrong debug level value") ]:
184 try:
185 dbus_set(dbus, wpas_obj, "DebugLevel", val)
186 raise Exception("Invalid DebugLevel value accepted: " + str(val))
187 except dbus.exceptions.DBusException, e:
188 if err not in str(e):
189 raise Exception("Unexpected error message: " + str(e))
190 dbus_set(dbus, wpas_obj, "DebugLevel", "msgdump")
191 dbus_get(dbus, wpas_obj, "DebugLevel", expect="msgdump")
192
193 dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=True)
194 dbus_set(dbus, wpas_obj, "DebugTimestamp", False)
195 dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=False)
196 try:
197 dbus_set(dbus, wpas_obj, "DebugTimestamp", "foo")
198 raise Exception("Invalid DebugTimestamp value accepted")
199 except dbus.exceptions.DBusException, e:
200 if "Error.Failed: wrong property type" not in str(e):
201 raise Exception("Unexpected error message: " + str(e))
202 dbus_set(dbus, wpas_obj, "DebugTimestamp", True)
203 dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=True)
204
205 dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=True)
206 dbus_set(dbus, wpas_obj, "DebugShowKeys", False)
207 dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=False)
208 try:
209 dbus_set(dbus, wpas_obj, "DebugShowKeys", "foo")
210 raise Exception("Invalid DebugShowKeys value accepted")
211 except dbus.exceptions.DBusException, e:
212 if "Error.Failed: wrong property type" not in str(e):
213 raise Exception("Unexpected error message: " + str(e))
214 dbus_set(dbus, wpas_obj, "DebugShowKeys", True)
215 dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=True)
216
217 res = dbus_get(dbus, wpas_obj, "Interfaces")
218 if len(res) != 1:
219 raise Exception("Unexpected Interfaces value: " + str(res))
220
221 res = dbus_get(dbus, wpas_obj, "EapMethods")
222 if len(res) < 5 or "TTLS" not in res:
223 raise Exception("Unexpected EapMethods value: " + str(res))
224
225 res = dbus_get(dbus, wpas_obj, "Capabilities")
226 if len(res) < 2 or "p2p" not in res:
227 raise Exception("Unexpected Capabilities value: " + str(res))
228
229 dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True)
230 val = binascii.unhexlify("010006020304050608")
231 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(val))
232 res = dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True)
233 if val != res:
234 raise Exception("WFDIEs value changed")
235 try:
236 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray('\x00'))
237 raise Exception("Invalid WFDIEs value accepted")
238 except dbus.exceptions.DBusException, e:
239 if "InvalidArgs" not in str(e):
240 raise Exception("Unexpected error message: " + str(e))
241 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(''))
242 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(val))
243 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(''))
244 res = dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True)
245 if len(res) != 0:
246 raise Exception("WFDIEs not cleared properly")
247
248 res = dbus_get(dbus, wpas_obj, "EapMethods")
249 try:
250 dbus_set(dbus, wpas_obj, "EapMethods", res)
251 raise Exception("Invalid Set accepted")
252 except dbus.exceptions.DBusException, e:
253 if "InvalidArgs: Property is read-only" not in str(e):
254 raise Exception("Unexpected error message: " + str(e))
255
256 try:
257 wpas_obj.SetFoo(WPAS_DBUS_SERVICE, "DebugShowKeys", True,
258 dbus_interface=dbus.PROPERTIES_IFACE)
259 raise Exception("Unknown method accepted")
260 except dbus.exceptions.DBusException, e:
261 if "UnknownMethod" not in str(e):
262 raise Exception("Unexpected error message: " + str(e))
263
264 try:
265 wpas_obj.Get("foo", "DebugShowKeys",
266 dbus_interface=dbus.PROPERTIES_IFACE)
267 raise Exception("Invalid Get accepted")
268 except dbus.exceptions.DBusException, e:
269 if "InvalidArgs: No such property" not in str(e):
270 raise Exception("Unexpected error message: " + str(e))
271
272 test_obj = bus.get_object(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH,
273 introspect=False)
274 try:
275 test_obj.Get(123, "DebugShowKeys",
276 dbus_interface=dbus.PROPERTIES_IFACE)
277 raise Exception("Invalid Get accepted")
278 except dbus.exceptions.DBusException, e:
279 if "InvalidArgs: Invalid arguments" not in str(e):
280 raise Exception("Unexpected error message: " + str(e))
281 try:
282 test_obj.Get(WPAS_DBUS_SERVICE, 123,
283 dbus_interface=dbus.PROPERTIES_IFACE)
284 raise Exception("Invalid Get accepted")
285 except dbus.exceptions.DBusException, e:
286 if "InvalidArgs: Invalid arguments" not in str(e):
287 raise Exception("Unexpected error message: " + str(e))
288
289 try:
290 wpas_obj.Set(WPAS_DBUS_SERVICE, "WFDIEs",
291 dbus.ByteArray('', variant_level=2),
292 dbus_interface=dbus.PROPERTIES_IFACE)
293 raise Exception("Invalid Set accepted")
294 except dbus.exceptions.DBusException, e:
295 if "InvalidArgs: invalid message format" not in str(e):
296 raise Exception("Unexpected error message: " + str(e))
297
298def test_dbus_invalid_method(dev, apdev):
299 """D-Bus invalid method"""
910eb5b5 300 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
301 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
302
303 try:
304 wps.Foo()
305 raise Exception("Unknown method accepted")
306 except dbus.exceptions.DBusException, e:
307 if "UnknownMethod" not in str(e):
308 raise Exception("Unexpected error message: " + str(e))
309
310 test_obj = bus.get_object(WPAS_DBUS_SERVICE, path, introspect=False)
311 test_wps = dbus.Interface(test_obj, WPAS_DBUS_IFACE_WPS)
312 try:
313 test_wps.Start(123)
314 raise Exception("WPS.Start with incorrect signature accepted")
315 except dbus.exceptions.DBusException, e:
316 if "InvalidArgs: Invalid arg" not in str(e):
317 raise Exception("Unexpected error message: " + str(e))
318
319def test_dbus_get_set_wps(dev, apdev):
320 """D-Bus Get/Set for WPS properties"""
321 try:
81e787b7 322 _test_dbus_get_set_wps(dev, apdev)
2ec82e67
JM
323 finally:
324 dev[0].request("SET wps_cred_processing 0")
364e28c9 325 dev[0].request("SET config_methods display keypad virtual_display nfc_interface p2ps")
2ec82e67
JM
326
327def _test_dbus_get_set_wps(dev, apdev):
910eb5b5 328 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
329
330 if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods",
331 dbus_interface=dbus.PROPERTIES_IFACE)
332
333 val = "display keypad virtual_display nfc_interface"
334 dev[0].request("SET config_methods " + val)
335
336 config = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods",
337 dbus_interface=dbus.PROPERTIES_IFACE)
338 if config != val:
339 raise Exception("Unexpected Get(ConfigMethods) result: " + config)
340
341 val2 = "push_button display"
342 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ConfigMethods", val2,
343 dbus_interface=dbus.PROPERTIES_IFACE)
344 config = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods",
345 dbus_interface=dbus.PROPERTIES_IFACE)
346 if config != val2:
347 raise Exception("Unexpected Get(ConfigMethods) result after Set: " + config)
348
349 dev[0].request("SET config_methods " + val)
350
351 for i in range(3):
352 dev[0].request("SET wps_cred_processing " + str(i))
353 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
354 dbus_interface=dbus.PROPERTIES_IFACE)
355 expected_val = False if i == 1 else True
356 if val != expected_val:
357 raise Exception("Unexpected Get(ProcessCredentials) result({}): {}".format(i, val))
358
359 class TestDbusGetSet(TestDbus):
360 def __init__(self, bus):
361 TestDbus.__init__(self, bus)
362 self.signal_received = False
363 self.signal_received_deprecated = False
364 self.sets_done = False
365
366 def __enter__(self):
367 gobject.timeout_add(1, self.run_sets)
368 gobject.timeout_add(1000, self.timeout)
369 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE_WPS,
370 "PropertiesChanged")
371 self.add_signal(self.propertiesChanged2, dbus.PROPERTIES_IFACE,
372 "PropertiesChanged")
373 self.loop.run()
374 return self
375
376 def propertiesChanged(self, properties):
377 logger.debug("PropertiesChanged: " + str(properties))
378 if properties.has_key("ProcessCredentials"):
379 self.signal_received_deprecated = True
380 if self.sets_done and self.signal_received:
381 self.loop.quit()
382
383 def propertiesChanged2(self, interface_name, changed_properties,
384 invalidated_properties):
385 logger.debug("propertiesChanged2: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
386 if interface_name != WPAS_DBUS_IFACE_WPS:
387 return
388 if changed_properties.has_key("ProcessCredentials"):
389 self.signal_received = True
390 if self.sets_done and self.signal_received_deprecated:
391 self.loop.quit()
392
393 def run_sets(self, *args):
394 logger.debug("run_sets")
395 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
396 dbus.Boolean(1),
397 dbus_interface=dbus.PROPERTIES_IFACE)
398 if if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
399 dbus_interface=dbus.PROPERTIES_IFACE) != True:
400 raise Exception("Unexpected Get(ProcessCredentials) result after Set");
401 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
402 dbus.Boolean(0),
403 dbus_interface=dbus.PROPERTIES_IFACE)
404 if if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
405 dbus_interface=dbus.PROPERTIES_IFACE) != False:
406 raise Exception("Unexpected Get(ProcessCredentials) result after Set");
407
408 self.dbus_sets_done = True
409 return False
410
411 def success(self):
412 return self.signal_received and self.signal_received_deprecated
413
414 with TestDbusGetSet(bus) as t:
415 if not t.success():
416 raise Exception("No signal received for ProcessCredentials change")
417
418def test_dbus_wps_invalid(dev, apdev):
419 """D-Bus invaldi WPS operation"""
910eb5b5 420 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
421 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
422
423 failures = [ {'Role': 'foo', 'Type': 'pbc'},
424 {'Role': 123, 'Type': 'pbc'},
425 {'Type': 'pbc'},
426 {'Role': 'enrollee'},
427 {'Role': 'registrar'},
428 {'Role': 'enrollee', 'Type': 123},
429 {'Role': 'enrollee', 'Type': 'foo'},
430 {'Role': 'enrollee', 'Type': 'pbc',
431 'Bssid': '02:33:44:55:66:77'},
432 {'Role': 'enrollee', 'Type': 'pin', 'Pin': 123},
433 {'Role': 'enrollee', 'Type': 'pbc',
434 'Bssid': dbus.ByteArray('12345')},
435 {'Role': 'enrollee', 'Type': 'pbc',
436 'P2PDeviceAddress': 12345},
437 {'Role': 'enrollee', 'Type': 'pbc',
438 'P2PDeviceAddress': dbus.ByteArray('12345')},
439 {'Role': 'enrollee', 'Type': 'pbc', 'Foo': 'bar'} ]
440 for args in failures:
441 try:
442 wps.Start(args)
443 raise Exception("Invalid WPS.Start() arguments accepted: " + str(args))
444 except dbus.exceptions.DBusException, e:
445 if not str(e).startswith("fi.w1.wpa_supplicant1.InvalidArgs"):
446 raise Exception("Unexpected error message: " + str(e))
447
0e126c6d
JM
448def test_dbus_wps_oom(dev, apdev):
449 """D-Bus WPS operation (OOM)"""
450 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
451 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
452
453 with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_getter_state", "Get"):
454 if_obj.Get(WPAS_DBUS_IFACE, "State",
455 dbus_interface=dbus.PROPERTIES_IFACE)
456
457 hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "open" })
458 bssid = apdev[0]['bssid']
459 dev[0].scan_for_bss(bssid, freq=2412)
460
461 for i in range(1, 3):
462 with alloc_fail_dbus(dev[0], i, "=wpas_dbus_getter_bsss", "Get"):
463 if_obj.Get(WPAS_DBUS_IFACE, "BSSs",
464 dbus_interface=dbus.PROPERTIES_IFACE)
465
466 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
467 dbus_interface=dbus.PROPERTIES_IFACE)
468 bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
469 with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_getter_bss_rates", "Get"):
470 bss_obj.Get(WPAS_DBUS_BSS, "Rates",
471 dbus_interface=dbus.PROPERTIES_IFACE)
472
473 id = dev[0].add_network()
474 dev[0].set_network(id, "disabled", "0")
475 dev[0].set_network_quoted(id, "ssid", "test")
476
477 for i in range(1, 3):
478 with alloc_fail_dbus(dev[0], i, "=wpas_dbus_getter_networks", "Get"):
479 if_obj.Get(WPAS_DBUS_IFACE, "Networks",
480 dbus_interface=dbus.PROPERTIES_IFACE)
481
482 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_getter_interfaces", "Get"):
483 dbus_get(dbus, wpas_obj, "Interfaces")
484
485 for i in range(1, 6):
486 with alloc_fail_dbus(dev[0], i, "=eap_get_names_as_string_array;wpas_dbus_getter_eap_methods", "Get"):
487 dbus_get(dbus, wpas_obj, "EapMethods")
488
489 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_setter_config_methods", "Set",
490 expected="Error.Failed: Failed to set property"):
491 val2 = "push_button display"
492 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ConfigMethods", val2,
493 dbus_interface=dbus.PROPERTIES_IFACE)
494
495 with alloc_fail_dbus(dev[0], 1, "=wpa_config_add_network;wpas_dbus_handler_wps_start",
496 "WPS.Start",
497 expected="UnknownError: WPS start failed"):
498 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670'})
499
2ec82e67
JM
500def test_dbus_wps_pbc(dev, apdev):
501 """D-Bus WPS/PBC operation and signals"""
502 try:
81e787b7 503 _test_dbus_wps_pbc(dev, apdev)
2ec82e67
JM
504 finally:
505 dev[0].request("SET wps_cred_processing 0")
506
507def _test_dbus_wps_pbc(dev, apdev):
910eb5b5 508 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
509 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
510
511 hapd = start_ap(apdev[0])
512 hapd.request("WPS_PBC")
513 bssid = apdev[0]['bssid']
514 dev[0].scan_for_bss(bssid, freq="2412")
515 dev[0].request("SET wps_cred_processing 2")
516
1d0a917f
JM
517 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
518 dbus_interface=dbus.PROPERTIES_IFACE)
519 if len(res) != 1:
520 raise Exception("Missing BSSs entry: " + str(res))
521 bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
522 props = bss_obj.GetAll(WPAS_DBUS_BSS, dbus_interface=dbus.PROPERTIES_IFACE)
523 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_BSS, res[0], str(props)))
524 if 'WPS' not in props:
525 raise Exception("No WPS information in the BSS entry")
526 if 'Type' not in props['WPS']:
527 raise Exception("No Type field in the WPS dictionary")
528 if props['WPS']['Type'] != 'pbc':
529 raise Exception("Unexpected WPS Type: " + props['WPS']['Type'])
530
2ec82e67
JM
531 class TestDbusWps(TestDbus):
532 def __init__(self, bus, wps):
533 TestDbus.__init__(self, bus)
534 self.success_seen = False
535 self.credentials_received = False
536 self.wps = wps
537
538 def __enter__(self):
539 gobject.timeout_add(1, self.start_pbc)
540 gobject.timeout_add(15000, self.timeout)
541 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
542 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
543 "Credentials")
544 self.loop.run()
545 return self
546
547 def wpsEvent(self, name, args):
548 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
549 if name == "success":
550 self.success_seen = True
551 if self.credentials_received:
552 self.loop.quit()
553
554 def credentials(self, args):
555 logger.debug("credentials: " + str(args))
556 self.credentials_received = True
557 if self.success_seen:
558 self.loop.quit()
559
560 def start_pbc(self, *args):
561 logger.debug("start_pbc")
562 self.wps.Start({'Role': 'enrollee', 'Type': 'pbc'})
563 return False
564
565 def success(self):
566 return self.success_seen and self.credentials_received
567
568 with TestDbusWps(bus, wps) as t:
569 if not t.success():
570 raise Exception("Failure in D-Bus operations")
571
572 dev[0].wait_connected(timeout=10)
573 dev[0].request("DISCONNECT")
574 hapd.disable()
575 dev[0].flush_scan_cache()
576
577def test_dbus_wps_pin(dev, apdev):
578 """D-Bus WPS/PIN operation and signals"""
579 try:
81e787b7 580 _test_dbus_wps_pin(dev, apdev)
2ec82e67
JM
581 finally:
582 dev[0].request("SET wps_cred_processing 0")
583
584def _test_dbus_wps_pin(dev, apdev):
910eb5b5 585 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
586 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
587
588 hapd = start_ap(apdev[0])
589 hapd.request("WPS_PIN any 12345670")
590 bssid = apdev[0]['bssid']
591 dev[0].scan_for_bss(bssid, freq="2412")
592 dev[0].request("SET wps_cred_processing 2")
593
594 class TestDbusWps(TestDbus):
595 def __init__(self, bus):
596 TestDbus.__init__(self, bus)
597 self.success_seen = False
598 self.credentials_received = False
599
600 def __enter__(self):
601 gobject.timeout_add(1, self.start_pin)
602 gobject.timeout_add(15000, self.timeout)
603 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
604 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
605 "Credentials")
606 self.loop.run()
607 return self
608
609 def wpsEvent(self, name, args):
610 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
611 if name == "success":
612 self.success_seen = True
613 if self.credentials_received:
614 self.loop.quit()
615
616 def credentials(self, args):
617 logger.debug("credentials: " + str(args))
618 self.credentials_received = True
619 if self.success_seen:
620 self.loop.quit()
621
622 def start_pin(self, *args):
623 logger.debug("start_pin")
624 bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
625 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
626 'Bssid': bssid_ay})
627 return False
628
629 def success(self):
630 return self.success_seen and self.credentials_received
631
632 with TestDbusWps(bus) as t:
633 if not t.success():
634 raise Exception("Failure in D-Bus operations")
635
636 dev[0].wait_connected(timeout=10)
637
638def test_dbus_wps_pin2(dev, apdev):
639 """D-Bus WPS/PIN operation and signals (PIN from wpa_supplicant)"""
640 try:
81e787b7 641 _test_dbus_wps_pin2(dev, apdev)
2ec82e67
JM
642 finally:
643 dev[0].request("SET wps_cred_processing 0")
644
645def _test_dbus_wps_pin2(dev, apdev):
910eb5b5 646 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
647 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
648
649 hapd = start_ap(apdev[0])
650 bssid = apdev[0]['bssid']
651 dev[0].scan_for_bss(bssid, freq="2412")
652 dev[0].request("SET wps_cred_processing 2")
653
654 class TestDbusWps(TestDbus):
655 def __init__(self, bus):
656 TestDbus.__init__(self, bus)
657 self.success_seen = False
658 self.failed = False
659
660 def __enter__(self):
661 gobject.timeout_add(1, self.start_pin)
662 gobject.timeout_add(15000, self.timeout)
663 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
664 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
665 "Credentials")
666 self.loop.run()
667 return self
668
669 def wpsEvent(self, name, args):
670 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
671 if name == "success":
672 self.success_seen = True
673 if self.credentials_received:
674 self.loop.quit()
675
676 def credentials(self, args):
677 logger.debug("credentials: " + str(args))
678 self.credentials_received = True
679 if self.success_seen:
680 self.loop.quit()
681
682 def start_pin(self, *args):
683 logger.debug("start_pin")
684 bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
685 res = wps.Start({'Role': 'enrollee', 'Type': 'pin',
686 'Bssid': bssid_ay})
687 pin = res['Pin']
688 h = hostapd.Hostapd(apdev[0]['ifname'])
689 h.request("WPS_PIN any " + pin)
690 return False
691
692 def success(self):
693 return self.success_seen and self.credentials_received
694
695 with TestDbusWps(bus) as t:
696 if not t.success():
697 raise Exception("Failure in D-Bus operations")
698
699 dev[0].wait_connected(timeout=10)
700
701def test_dbus_wps_pin_m2d(dev, apdev):
702 """D-Bus WPS/PIN operation and signals with M2D"""
703 try:
81e787b7 704 _test_dbus_wps_pin_m2d(dev, apdev)
2ec82e67
JM
705 finally:
706 dev[0].request("SET wps_cred_processing 0")
707
708def _test_dbus_wps_pin_m2d(dev, apdev):
910eb5b5 709 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
710 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
711
712 hapd = start_ap(apdev[0])
713 bssid = apdev[0]['bssid']
714 dev[0].scan_for_bss(bssid, freq="2412")
715 dev[0].request("SET wps_cred_processing 2")
716
717 class TestDbusWps(TestDbus):
718 def __init__(self, bus):
719 TestDbus.__init__(self, bus)
720 self.success_seen = False
721 self.credentials_received = False
722
723 def __enter__(self):
724 gobject.timeout_add(1, self.start_pin)
725 gobject.timeout_add(15000, self.timeout)
726 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
727 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
728 "Credentials")
729 self.loop.run()
730 return self
731
732 def wpsEvent(self, name, args):
733 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
734 if name == "success":
735 self.success_seen = True
736 if self.credentials_received:
737 self.loop.quit()
738 elif name == "m2d":
739 h = hostapd.Hostapd(apdev[0]['ifname'])
740 h.request("WPS_PIN any 12345670")
741
742 def credentials(self, args):
743 logger.debug("credentials: " + str(args))
744 self.credentials_received = True
745 if self.success_seen:
746 self.loop.quit()
747
748 def start_pin(self, *args):
749 logger.debug("start_pin")
750 bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
751 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
752 'Bssid': bssid_ay})
753 return False
754
755 def success(self):
756 return self.success_seen and self.credentials_received
757
758 with TestDbusWps(bus) as t:
759 if not t.success():
760 raise Exception("Failure in D-Bus operations")
761
762 dev[0].wait_connected(timeout=10)
763
764def test_dbus_wps_reg(dev, apdev):
765 """D-Bus WPS/Registrar operation and signals"""
766 try:
81e787b7 767 _test_dbus_wps_reg(dev, apdev)
2ec82e67
JM
768 finally:
769 dev[0].request("SET wps_cred_processing 0")
770
771def _test_dbus_wps_reg(dev, apdev):
910eb5b5 772 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
773 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
774
775 hapd = start_ap(apdev[0])
776 hapd.request("WPS_PIN any 12345670")
777 bssid = apdev[0]['bssid']
778 dev[0].scan_for_bss(bssid, freq="2412")
779 dev[0].request("SET wps_cred_processing 2")
780
781 class TestDbusWps(TestDbus):
782 def __init__(self, bus):
783 TestDbus.__init__(self, bus)
784 self.credentials_received = False
785
786 def __enter__(self):
787 gobject.timeout_add(100, self.start_reg)
788 gobject.timeout_add(15000, self.timeout)
789 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
790 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
791 "Credentials")
792 self.loop.run()
793 return self
794
795 def wpsEvent(self, name, args):
796 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
797
798 def credentials(self, args):
799 logger.debug("credentials: " + str(args))
800 self.credentials_received = True
801 self.loop.quit()
802
803 def start_reg(self, *args):
804 logger.debug("start_reg")
805 bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
806 wps.Start({'Role': 'registrar', 'Type': 'pin',
807 'Pin': '12345670', 'Bssid': bssid_ay})
808 return False
809
810 def success(self):
811 return self.credentials_received
812
813 with TestDbusWps(bus) as t:
814 if not t.success():
815 raise Exception("Failure in D-Bus operations")
816
817 dev[0].wait_connected(timeout=10)
818
819def test_dbus_scan_invalid(dev, apdev):
820 """D-Bus invalid scan method"""
910eb5b5 821 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
822 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
823
824 tests = [ ({}, "InvalidArgs"),
825 ({'Type': 123}, "InvalidArgs"),
826 ({'Type': 'foo'}, "InvalidArgs"),
827 ({'Type': 'active', 'Foo': 'bar'}, "InvalidArgs"),
828 ({'Type': 'active', 'SSIDs': 'foo'}, "InvalidArgs"),
829 ({'Type': 'active', 'SSIDs': ['foo']}, "InvalidArgs"),
830 ({'Type': 'active',
831 'SSIDs': [ dbus.ByteArray("1"), dbus.ByteArray("2"),
832 dbus.ByteArray("3"), dbus.ByteArray("4"),
833 dbus.ByteArray("5"), dbus.ByteArray("6"),
834 dbus.ByteArray("7"), dbus.ByteArray("8"),
835 dbus.ByteArray("9"), dbus.ByteArray("10"),
836 dbus.ByteArray("11"), dbus.ByteArray("12"),
837 dbus.ByteArray("13"), dbus.ByteArray("14"),
838 dbus.ByteArray("15"), dbus.ByteArray("16"),
839 dbus.ByteArray("17") ]},
840 "InvalidArgs"),
841 ({'Type': 'active',
842 'SSIDs': [ dbus.ByteArray("1234567890abcdef1234567890abcdef1") ]},
843 "InvalidArgs"),
844 ({'Type': 'active', 'IEs': 'foo'}, "InvalidArgs"),
845 ({'Type': 'active', 'IEs': ['foo']}, "InvalidArgs"),
846 ({'Type': 'active', 'Channels': 2412 }, "InvalidArgs"),
847 ({'Type': 'active', 'Channels': [ 2412 ] }, "InvalidArgs"),
848 ({'Type': 'active',
849 'Channels': [ (dbus.Int32(2412), dbus.UInt32(20)) ] },
850 "InvalidArgs"),
851 ({'Type': 'active',
852 'Channels': [ (dbus.UInt32(2412), dbus.Int32(20)) ] },
853 "InvalidArgs"),
854 ({'Type': 'active', 'AllowRoam': "yes" }, "InvalidArgs"),
855 ({'Type': 'passive', 'IEs': [ dbus.ByteArray("\xdd\x00") ]},
856 "InvalidArgs"),
857 ({'Type': 'passive', 'SSIDs': [ dbus.ByteArray("foo") ]},
858 "InvalidArgs")]
859 for (t,err) in tests:
860 try:
861 iface.Scan(t)
862 raise Exception("Invalid Scan() arguments accepted: " + str(t))
863 except dbus.exceptions.DBusException, e:
864 if err not in str(e):
865 raise Exception("Unexpected error message for invalid Scan(%s): %s" % (str(t), str(e)))
866
0e126c6d
JM
867def test_dbus_scan_oom(dev, apdev):
868 """D-Bus scan method and OOM"""
869 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
870 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
871
872 with alloc_fail_dbus(dev[0], 1,
873 "wpa_scan_clone_params;wpas_dbus_handler_scan",
874 "Scan", expected="ScanError: Scan request rejected"):
875 iface.Scan({ 'Type': 'passive',
876 'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
877
878 with alloc_fail_dbus(dev[0], 1,
879 "=wpas_dbus_get_scan_channels;wpas_dbus_handler_scan",
880 "Scan"):
881 iface.Scan({ 'Type': 'passive',
882 'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
883
884 with alloc_fail_dbus(dev[0], 1,
885 "=wpas_dbus_get_scan_ies;wpas_dbus_handler_scan",
886 "Scan"):
887 iface.Scan({ 'Type': 'active',
888 'IEs': [ dbus.ByteArray("\xdd\x00") ],
889 'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
890
891 with alloc_fail_dbus(dev[0], 1,
892 "=wpas_dbus_get_scan_ssids;wpas_dbus_handler_scan",
893 "Scan"):
894 iface.Scan({ 'Type': 'active',
895 'SSIDs': [ dbus.ByteArray("open"),
896 dbus.ByteArray() ],
897 'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
898
2ec82e67
JM
899def test_dbus_scan(dev, apdev):
900 """D-Bus scan and related signals"""
910eb5b5 901 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
902 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
903
904 hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "open" })
905
906 class TestDbusScan(TestDbus):
907 def __init__(self, bus):
908 TestDbus.__init__(self, bus)
909 self.scan_completed = 0
910 self.bss_added = False
1d0a917f 911 self.fail_reason = None
2ec82e67
JM
912
913 def __enter__(self):
914 gobject.timeout_add(1, self.run_scan)
915 gobject.timeout_add(15000, self.timeout)
916 self.add_signal(self.scanDone, WPAS_DBUS_IFACE, "ScanDone")
917 self.add_signal(self.bssAdded, WPAS_DBUS_IFACE, "BSSAdded")
918 self.add_signal(self.bssRemoved, WPAS_DBUS_IFACE, "BSSRemoved")
919 self.loop.run()
920 return self
921
922 def scanDone(self, success):
923 logger.debug("scanDone: success=%s" % success)
924 self.scan_completed += 1
925 if self.scan_completed == 1:
926 iface.Scan({'Type': 'passive',
927 'AllowRoam': True,
928 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
929 elif self.scan_completed == 2:
930 iface.Scan({'Type': 'passive',
931 'AllowRoam': False})
932 elif self.bss_added and self.scan_completed == 3:
933 self.loop.quit()
934
935 def bssAdded(self, bss, properties):
936 logger.debug("bssAdded: %s" % bss)
937 logger.debug(str(properties))
1d0a917f
JM
938 if 'WPS' in properties:
939 if 'Type' in properties['WPS']:
940 self.fail_reason = "Unexpected WPS dictionary entry in non-WPS BSS"
941 self.loop.quit()
2ec82e67
JM
942 self.bss_added = True
943 if self.scan_completed == 3:
944 self.loop.quit()
945
946 def bssRemoved(self, bss):
947 logger.debug("bssRemoved: %s" % bss)
948
949 def run_scan(self, *args):
950 logger.debug("run_scan")
951 iface.Scan({'Type': 'active',
952 'SSIDs': [ dbus.ByteArray("open"),
953 dbus.ByteArray() ],
954 'IEs': [ dbus.ByteArray("\xdd\x00"),
955 dbus.ByteArray() ],
956 'AllowRoam': False,
957 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
958 return False
959
960 def success(self):
961 return self.scan_completed == 3 and self.bss_added
962
963 with TestDbusScan(bus) as t:
1d0a917f
JM
964 if t.fail_reason:
965 raise Exception(t.fail_reason)
2ec82e67
JM
966 if not t.success():
967 raise Exception("Expected signals not seen")
968
969 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSs",
970 dbus_interface=dbus.PROPERTIES_IFACE)
971 if len(res) < 1:
972 raise Exception("Scan result not in BSSs property")
973 iface.FlushBSS(0)
974 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSs",
975 dbus_interface=dbus.PROPERTIES_IFACE)
976 if len(res) != 0:
977 raise Exception("FlushBSS() did not remove scan results from BSSs property")
978 iface.FlushBSS(1)
979
980def test_dbus_scan_busy(dev, apdev):
981 """D-Bus scan trigger rejection when busy with previous scan"""
910eb5b5 982 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
983 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
984
985 if "OK" not in dev[0].request("SCAN freq=2412-2462"):
986 raise Exception("Failed to start scan")
987 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-STARTED"], 15)
988 if ev is None:
989 raise Exception("Scan start timed out")
990
991 try:
992 iface.Scan({'Type': 'active', 'AllowRoam': False})
993 raise Exception("Scan() accepted when busy")
994 except dbus.exceptions.DBusException, e:
995 if "ScanError: Scan request reject" not in str(e):
996 raise Exception("Unexpected error message: " + str(e))
997
998 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
999 if ev is None:
1000 raise Exception("Scan timed out")
1001
1002def test_dbus_connect(dev, apdev):
1003 """D-Bus AddNetwork and connect"""
910eb5b5 1004 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1005 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1006
1007 ssid = "test-wpa2-psk"
1008 passphrase = 'qwertyuiop'
1009 params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
1010 hapd = hostapd.add_ap(apdev[0]['ifname'], params)
1011
1012 class TestDbusConnect(TestDbus):
1013 def __init__(self, bus):
1014 TestDbus.__init__(self, bus)
1015 self.network_added = False
1016 self.network_selected = False
1017 self.network_removed = False
1018 self.state = 0
1019
1020 def __enter__(self):
1021 gobject.timeout_add(1, self.run_connect)
1022 gobject.timeout_add(15000, self.timeout)
1023 self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
1024 self.add_signal(self.networkRemoved, WPAS_DBUS_IFACE,
1025 "NetworkRemoved")
1026 self.add_signal(self.networkSelected, WPAS_DBUS_IFACE,
1027 "NetworkSelected")
1028 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1029 "PropertiesChanged")
1030 self.loop.run()
1031 return self
1032
1033 def networkAdded(self, network, properties):
1034 logger.debug("networkAdded: %s" % str(network))
1035 logger.debug(str(properties))
1036 self.network_added = True
1037
1038 def networkRemoved(self, network):
1039 logger.debug("networkRemoved: %s" % str(network))
1040 self.network_removed = True
1041
1042 def networkSelected(self, network):
1043 logger.debug("networkSelected: %s" % str(network))
1044 self.network_selected = True
1045
1046 def propertiesChanged(self, properties):
1047 logger.debug("propertiesChanged: %s" % str(properties))
1048 if 'State' in properties and properties['State'] == "completed":
1049 if self.state == 0:
1050 self.state = 1
1051 iface.Disconnect()
1052 elif self.state == 2:
1053 self.state = 3
1054 iface.Disconnect()
1055 elif self.state == 4:
1056 self.state = 5
1057 iface.Reattach()
1058 elif self.state == 5:
1059 self.state = 6
1060 res = iface.SignalPoll()
1061 logger.debug("SignalPoll: " + str(res))
1062 if 'frequency' not in res or res['frequency'] != 2412:
1063 self.state = -1
1064 logger.info("Unexpected SignalPoll result")
1065 iface.RemoveNetwork(self.netw)
1066 if 'State' in properties and properties['State'] == "disconnected":
1067 if self.state == 1:
1068 self.state = 2
1069 iface.SelectNetwork(self.netw)
1070 elif self.state == 3:
1071 self.state = 4
1072 iface.Reassociate()
1073 elif self.state == 6:
1074 self.state = 7
1075 self.loop.quit()
1076
1077 def run_connect(self, *args):
1078 logger.debug("run_connect")
1079 args = dbus.Dictionary({ 'ssid': ssid,
1080 'key_mgmt': 'WPA-PSK',
1081 'psk': passphrase,
1082 'scan_freq': 2412 },
1083 signature='sv')
1084 self.netw = iface.AddNetwork(args)
1085 iface.SelectNetwork(self.netw)
1086 return False
1087
1088 def success(self):
1089 if not self.network_added or \
1090 not self.network_removed or \
1091 not self.network_selected:
1092 return False
1093 return self.state == 7
1094
1095 with TestDbusConnect(bus) as t:
1096 if not t.success():
1097 raise Exception("Expected signals not seen")
1098
53f4ed68
JM
1099def test_dbus_connect_psk_mem(dev, apdev):
1100 """D-Bus AddNetwork and connect with memory-only PSK"""
1101 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1102 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1103
1104 ssid = "test-wpa2-psk"
1105 passphrase = 'qwertyuiop'
1106 params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
1107 hapd = hostapd.add_ap(apdev[0]['ifname'], params)
1108
1109 class TestDbusConnect(TestDbus):
1110 def __init__(self, bus):
1111 TestDbus.__init__(self, bus)
1112 self.connected = False
1113
1114 def __enter__(self):
1115 gobject.timeout_add(1, self.run_connect)
1116 gobject.timeout_add(15000, self.timeout)
1117 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1118 "PropertiesChanged")
1119 self.add_signal(self.networkRequest, WPAS_DBUS_IFACE,
1120 "NetworkRequest")
1121 self.loop.run()
1122 return self
1123
1124 def propertiesChanged(self, properties):
1125 logger.debug("propertiesChanged: %s" % str(properties))
1126 if 'State' in properties and properties['State'] == "completed":
1127 self.connected = True
1128 self.loop.quit()
1129
1130 def networkRequest(self, path, field, txt):
1131 logger.debug("networkRequest: %s %s %s" % (path, field, txt))
1132 if field == "PSK_PASSPHRASE":
1133 iface.NetworkReply(path, field, '"' + passphrase + '"')
1134
1135 def run_connect(self, *args):
1136 logger.debug("run_connect")
1137 args = dbus.Dictionary({ 'ssid': ssid,
1138 'key_mgmt': 'WPA-PSK',
1139 'mem_only_psk': 1,
1140 'scan_freq': 2412 },
1141 signature='sv')
1142 self.netw = iface.AddNetwork(args)
1143 iface.SelectNetwork(self.netw)
1144 return False
1145
1146 def success(self):
1147 return self.connected
1148
1149 with TestDbusConnect(bus) as t:
1150 if not t.success():
1151 raise Exception("Expected signals not seen")
1152
0e126c6d
JM
1153def test_dbus_connect_oom(dev, apdev):
1154 """D-Bus AddNetwork and connect when out-of-memory"""
1155 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1156 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1157
1158 if "OK" not in dev[0].request("TEST_ALLOC_FAIL 0:"):
1159 raise HwsimSkip("TEST_ALLOC_FAIL not supported in the build")
1160
1161 ssid = "test-wpa2-psk"
1162 passphrase = 'qwertyuiop'
1163 params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
1164 hapd = hostapd.add_ap(apdev[0]['ifname'], params)
1165
1166 class TestDbusConnect(TestDbus):
1167 def __init__(self, bus):
1168 TestDbus.__init__(self, bus)
1169 self.network_added = False
1170 self.network_selected = False
1171 self.network_removed = False
1172 self.state = 0
1173
1174 def __enter__(self):
1175 gobject.timeout_add(1, self.run_connect)
1176 gobject.timeout_add(1500, self.timeout)
1177 self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
1178 self.add_signal(self.networkRemoved, WPAS_DBUS_IFACE,
1179 "NetworkRemoved")
1180 self.add_signal(self.networkSelected, WPAS_DBUS_IFACE,
1181 "NetworkSelected")
1182 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1183 "PropertiesChanged")
1184 self.loop.run()
1185 return self
1186
1187 def networkAdded(self, network, properties):
1188 logger.debug("networkAdded: %s" % str(network))
1189 logger.debug(str(properties))
1190 self.network_added = True
1191
1192 def networkRemoved(self, network):
1193 logger.debug("networkRemoved: %s" % str(network))
1194 self.network_removed = True
1195
1196 def networkSelected(self, network):
1197 logger.debug("networkSelected: %s" % str(network))
1198 self.network_selected = True
1199
1200 def propertiesChanged(self, properties):
1201 logger.debug("propertiesChanged: %s" % str(properties))
1202 if 'State' in properties and properties['State'] == "completed":
1203 if self.state == 0:
1204 self.state = 1
1205 iface.Disconnect()
1206 elif self.state == 2:
1207 self.state = 3
1208 iface.Disconnect()
1209 elif self.state == 4:
1210 self.state = 5
1211 iface.Reattach()
1212 elif self.state == 5:
1213 self.state = 6
1214 res = iface.SignalPoll()
1215 logger.debug("SignalPoll: " + str(res))
1216 if 'frequency' not in res or res['frequency'] != 2412:
1217 self.state = -1
1218 logger.info("Unexpected SignalPoll result")
1219 iface.RemoveNetwork(self.netw)
1220 if 'State' in properties and properties['State'] == "disconnected":
1221 if self.state == 1:
1222 self.state = 2
1223 iface.SelectNetwork(self.netw)
1224 elif self.state == 3:
1225 self.state = 4
1226 iface.Reassociate()
1227 elif self.state == 6:
1228 self.state = 7
1229 self.loop.quit()
1230
1231 def run_connect(self, *args):
1232 logger.debug("run_connect")
1233 args = dbus.Dictionary({ 'ssid': ssid,
1234 'key_mgmt': 'WPA-PSK',
1235 'psk': passphrase,
1236 'scan_freq': 2412 },
1237 signature='sv')
1238 try:
1239 self.netw = iface.AddNetwork(args)
1240 except Exception, e:
1241 logger.info("Exception on AddNetwork: " + str(e))
1242 self.loop.quit()
1243 return False
1244 try:
1245 iface.SelectNetwork(self.netw)
1246 except Exception, e:
1247 logger.info("Exception on SelectNetwork: " + str(e))
1248 self.loop.quit()
1249
1250 return False
1251
1252 def success(self):
1253 if not self.network_added or \
1254 not self.network_removed or \
1255 not self.network_selected:
1256 return False
1257 return self.state == 7
1258
1259 count = 0
1260 for i in range(1, 1000):
1261 for j in range(3):
1262 dev[j].dump_monitor()
1263 dev[0].request("TEST_ALLOC_FAIL %d:main" % i)
1264 try:
1265 with TestDbusConnect(bus) as t:
1266 if not t.success():
1267 logger.info("Iteration %d - Expected signals not seen" % i)
1268 else:
1269 logger.info("Iteration %d - success" % i)
1270
1271 state = dev[0].request('GET_ALLOC_FAIL')
1272 logger.info("GET_ALLOC_FAIL: " + state)
1273 dev[0].dump_monitor()
1274 dev[0].request("TEST_ALLOC_FAIL 0:")
1275 if i < 3:
1276 raise Exception("Connection succeeded during out-of-memory")
1277 if not state.startswith('0:'):
1278 count += 1
1279 if count == 5:
1280 break
1281 except:
1282 pass
2ec82e67
JM
1283
1284def test_dbus_while_not_connected(dev, apdev):
1285 """D-Bus invalid operations while not connected"""
910eb5b5 1286 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1287 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1288
1289 try:
1290 iface.Disconnect()
1291 raise Exception("Disconnect() accepted when not connected")
1292 except dbus.exceptions.DBusException, e:
1293 if "NotConnected" not in str(e):
1294 raise Exception("Unexpected error message for invalid Disconnect: " + str(e))
1295
1296 try:
1297 iface.Reattach()
1298 raise Exception("Reattach() accepted when not connected")
1299 except dbus.exceptions.DBusException, e:
1300 if "NotConnected" not in str(e):
1301 raise Exception("Unexpected error message for invalid Reattach: " + str(e))
1302
1303def test_dbus_connect_eap(dev, apdev):
1304 """D-Bus AddNetwork and connect to EAP network"""
910eb5b5 1305 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1306 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1307
1308 ssid = "ieee8021x-open"
1309 params = hostapd.radius_params()
1310 params["ssid"] = ssid
1311 params["ieee8021x"] = "1"
1312 hapd = hostapd.add_ap(apdev[0]['ifname'], params)
1313
1314 class TestDbusConnect(TestDbus):
1315 def __init__(self, bus):
1316 TestDbus.__init__(self, bus)
1317 self.certification_received = False
1318 self.eap_status = False
1319 self.state = 0
1320
1321 def __enter__(self):
1322 gobject.timeout_add(1, self.run_connect)
1323 gobject.timeout_add(15000, self.timeout)
1324 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1325 "PropertiesChanged")
1326 self.add_signal(self.certification, WPAS_DBUS_IFACE,
2099fed4 1327 "Certification", byte_arrays=True)
2ec82e67
JM
1328 self.add_signal(self.networkRequest, WPAS_DBUS_IFACE,
1329 "NetworkRequest")
1330 self.add_signal(self.eap, WPAS_DBUS_IFACE, "EAP")
1331 self.loop.run()
1332 return self
1333
1334 def propertiesChanged(self, properties):
1335 logger.debug("propertiesChanged: %s" % str(properties))
1336 if 'State' in properties and properties['State'] == "completed":
1337 if self.state == 0:
1338 self.state = 1
1339 iface.EAPLogoff()
2099fed4
JM
1340 logger.info("Set dNSName constraint")
1341 net_obj = bus.get_object(WPAS_DBUS_SERVICE, self.netw)
1342 args = dbus.Dictionary({ 'altsubject_match':
1343 self.server_dnsname },
1344 signature='sv')
1345 net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args,
1346 dbus_interface=dbus.PROPERTIES_IFACE)
2ec82e67
JM
1347 elif self.state == 2:
1348 self.state = 3
2099fed4
JM
1349 iface.Disconnect()
1350 logger.info("Set non-matching dNSName constraint")
1351 net_obj = bus.get_object(WPAS_DBUS_SERVICE, self.netw)
1352 args = dbus.Dictionary({ 'altsubject_match':
1353 self.server_dnsname + "FOO" },
1354 signature='sv')
1355 net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args,
1356 dbus_interface=dbus.PROPERTIES_IFACE)
2ec82e67
JM
1357 if 'State' in properties and properties['State'] == "disconnected":
1358 if self.state == 1:
1359 self.state = 2
1360 iface.EAPLogon()
1361 iface.SelectNetwork(self.netw)
2099fed4
JM
1362 if self.state == 3:
1363 self.state = 4
1364 iface.SelectNetwork(self.netw)
2ec82e67
JM
1365
1366 def certification(self, args):
1367 logger.debug("certification: %s" % str(args))
1368 self.certification_received = True
2099fed4
JM
1369 if args['depth'] == 0:
1370 # The test server certificate is supposed to have dNSName
1371 if len(args['altsubject']) < 1:
1372 raise Exception("Missing dNSName")
1373 dnsname = args['altsubject'][0]
1374 if not dnsname.startswith("DNS:"):
1375 raise Exception("Expected dNSName not found: " + dnsname)
1376 logger.info("altsubject: " + dnsname)
1377 self.server_dnsname = dnsname
2ec82e67
JM
1378
1379 def eap(self, status, parameter):
1380 logger.debug("EAP: status=%s parameter=%s" % (status, parameter))
1381 if status == 'completion' and parameter == 'success':
1382 self.eap_status = True
2099fed4
JM
1383 if self.state == 4 and status == 'remote certificate verification' and parameter == 'AltSubject mismatch':
1384 self.state = 5
1385 self.loop.quit()
2ec82e67
JM
1386
1387 def networkRequest(self, path, field, txt):
1388 logger.debug("networkRequest: %s %s %s" % (path, field, txt))
1389 if field == "PASSWORD":
1390 iface.NetworkReply(path, field, "password")
1391
1392 def run_connect(self, *args):
1393 logger.debug("run_connect")
1394 args = dbus.Dictionary({ 'ssid': ssid,
1395 'key_mgmt': 'IEEE8021X',
1396 'eapol_flags': 0,
1397 'eap': 'TTLS',
1398 'anonymous_identity': 'ttls',
1399 'identity': 'pap user',
1400 'ca_cert': 'auth_serv/ca.pem',
1401 'phase2': 'auth=PAP',
1402 'scan_freq': 2412 },
1403 signature='sv')
1404 self.netw = iface.AddNetwork(args)
1405 iface.SelectNetwork(self.netw)
1406 return False
1407
1408 def success(self):
1409 if not self.eap_status or not self.certification_received:
1410 return False
2099fed4 1411 return self.state == 5
2ec82e67
JM
1412
1413 with TestDbusConnect(bus) as t:
1414 if not t.success():
1415 raise Exception("Expected signals not seen")
1416
1417def test_dbus_network(dev, apdev):
1418 """D-Bus AddNetwork/RemoveNetwork parameters and error cases"""
910eb5b5 1419 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1420 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1421
1422 args = dbus.Dictionary({ 'ssid': "foo",
1423 'key_mgmt': 'WPA-PSK',
1424 'psk': "12345678",
1425 'identity': dbus.ByteArray([ 1, 2 ]),
1426 'priority': dbus.Int32(0),
1427 'scan_freq': dbus.UInt32(2412) },
1428 signature='sv')
1429 netw = iface.AddNetwork(args)
1430 iface.RemoveNetwork(netw)
1431 try:
1432 iface.RemoveNetwork(netw)
1433 raise Exception("Invalid RemoveNetwork() accepted")
1434 except dbus.exceptions.DBusException, e:
1435 if "NetworkUnknown" not in str(e):
1436 raise Exception("Unexpected error message for invalid RemoveNetwork: " + str(e))
1437 try:
1438 iface.SelectNetwork(netw)
1439 raise Exception("Invalid SelectNetwork() accepted")
1440 except dbus.exceptions.DBusException, e:
1441 if "NetworkUnknown" not in str(e):
1442 raise Exception("Unexpected error message for invalid RemoveNetwork: " + str(e))
1443
1444 args = dbus.Dictionary({ 'ssid': "foo1", 'key_mgmt': 'NONE',
1445 'identity': "testuser", 'scan_freq': '2412' },
1446 signature='sv')
1447 netw1 = iface.AddNetwork(args)
1448 args = dbus.Dictionary({ 'ssid': "foo2", 'key_mgmt': 'NONE' },
1449 signature='sv')
1450 netw2 = iface.AddNetwork(args)
1451 res = if_obj.Get(WPAS_DBUS_IFACE, "Networks",
1452 dbus_interface=dbus.PROPERTIES_IFACE)
1453 if len(res) != 2:
1454 raise Exception("Unexpected number of networks")
1455
1456 net_obj = bus.get_object(WPAS_DBUS_SERVICE, netw1)
1457 res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled",
1458 dbus_interface=dbus.PROPERTIES_IFACE)
1459 if res != False:
1460 raise Exception("Added network was unexpectedly enabled by default")
1461 net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.Boolean(True),
1462 dbus_interface=dbus.PROPERTIES_IFACE)
1463 res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled",
1464 dbus_interface=dbus.PROPERTIES_IFACE)
1465 if res != True:
1466 raise Exception("Set(Enabled,True) did not seem to change property value")
1467 net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.Boolean(False),
1468 dbus_interface=dbus.PROPERTIES_IFACE)
1469 res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled",
1470 dbus_interface=dbus.PROPERTIES_IFACE)
1471 if res != False:
1472 raise Exception("Set(Enabled,False) did not seem to change property value")
1473 try:
1474 net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.UInt32(1),
1475 dbus_interface=dbus.PROPERTIES_IFACE)
1476 raise Exception("Invalid Set(Enabled,1) accepted")
1477 except dbus.exceptions.DBusException, e:
1478 if "Error.Failed: wrong property type" not in str(e):
1479 raise Exception("Unexpected error message for invalid Set(Enabled,1): " + str(e))
1480
1481 args = dbus.Dictionary({ 'ssid': "foo1new" }, signature='sv')
1482 net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args,
1483 dbus_interface=dbus.PROPERTIES_IFACE)
1484 res = net_obj.Get(WPAS_DBUS_NETWORK, "Properties",
1485 dbus_interface=dbus.PROPERTIES_IFACE)
1486 if res['ssid'] != '"foo1new"':
1487 raise Exception("Set(Properties) failed to update ssid")
1488 if res['identity'] != '"testuser"':
1489 raise Exception("Set(Properties) unexpectedly changed unrelated parameter")
1490
1491 iface.RemoveAllNetworks()
1492 res = if_obj.Get(WPAS_DBUS_IFACE, "Networks",
1493 dbus_interface=dbus.PROPERTIES_IFACE)
1494 if len(res) != 0:
1495 raise Exception("Unexpected number of networks")
1496 iface.RemoveAllNetworks()
1497
1498 tests = [ dbus.Dictionary({ 'psk': "1234567" }, signature='sv'),
1499 dbus.Dictionary({ 'identity': dbus.ByteArray() },
1500 signature='sv'),
1501 dbus.Dictionary({ 'identity': dbus.Byte(1) }, signature='sv'),
1502 dbus.Dictionary({ 'identity': "" }, signature='sv') ]
1503 for args in tests:
1504 try:
1505 iface.AddNetwork(args)
1506 raise Exception("Invalid AddNetwork args accepted: " + str(args))
1507 except dbus.exceptions.DBusException, e:
1508 if "InvalidArgs" not in str(e):
1509 raise Exception("Unexpected error message for invalid AddNetwork: " + str(e))
1510
0e126c6d
JM
1511def test_dbus_network_oom(dev, apdev):
1512 """D-Bus AddNetwork/RemoveNetwork parameters and OOM error cases"""
1513 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1514 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1515
1516 args = dbus.Dictionary({ 'ssid': "foo1", 'key_mgmt': 'NONE',
1517 'identity': "testuser", 'scan_freq': '2412' },
1518 signature='sv')
1519 netw1 = iface.AddNetwork(args)
1520 net_obj = bus.get_object(WPAS_DBUS_SERVICE, netw1)
1521
1522 with alloc_fail_dbus(dev[0], 1,
1523 "wpa_config_get_all;wpas_dbus_getter_network_properties",
1524 "Get"):
1525 net_obj.Get(WPAS_DBUS_NETWORK, "Properties",
1526 dbus_interface=dbus.PROPERTIES_IFACE)
1527
1528 iface.RemoveAllNetworks()
1529
1530 with alloc_fail_dbus(dev[0], 1,
1531 "wpas_dbus_new_decompose_object_path;wpas_dbus_handler_remove_network",
1532 "RemoveNetwork", "InvalidArgs"):
1533 iface.RemoveNetwork(dbus.ObjectPath("/fi/w1/wpa_supplicant1/Interfaces/1234/Networks/1234"))
1534
1535 with alloc_fail(dev[0], 1, "wpa_dbus_register_object_per_iface;wpas_dbus_register_network"):
1536 args = dbus.Dictionary({ 'ssid': "foo2", 'key_mgmt': 'NONE' },
1537 signature='sv')
1538 try:
1539 netw = iface.AddNetwork(args)
1540 # Currently, AddNetwork() succeeds even if os_strdup() for path
1541 # fails, so remove the network if that occurs.
1542 iface.RemoveNetwork(netw)
1543 except dbus.exceptions.DBusException, e:
1544 pass
1545
1546 for i in range(1, 3):
1547 with alloc_fail(dev[0], i, "=wpas_dbus_register_network"):
1548 try:
1549 netw = iface.AddNetwork(args)
1550 # Currently, AddNetwork() succeeds even if network registration
1551 # fails, so remove the network if that occurs.
1552 iface.RemoveNetwork(netw)
1553 except dbus.exceptions.DBusException, e:
1554 pass
1555
1556 with alloc_fail_dbus(dev[0], 1,
1557 "=wpa_config_add_network;wpas_dbus_handler_add_network",
1558 "AddNetwork",
1559 "UnknownError: wpa_supplicant could not add a network"):
1560 args = dbus.Dictionary({ 'ssid': "foo2", 'key_mgmt': 'NONE' },
1561 signature='sv')
1562 netw = iface.AddNetwork(args)
1563
1564 tests = [ (1,
1565 'wpa_dbus_dict_get_entry;set_network_properties;wpas_dbus_handler_add_network',
1566 dbus.Dictionary({ 'ssid': dbus.ByteArray(' ') },
1567 signature='sv')),
1568 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1569 dbus.Dictionary({ 'ssid': 'foo' }, signature='sv')),
1570 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1571 dbus.Dictionary({ 'eap': 'foo' }, signature='sv')),
1572 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1573 dbus.Dictionary({ 'priority': dbus.UInt32(1) },
1574 signature='sv')),
1575 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1576 dbus.Dictionary({ 'priority': dbus.Int32(1) },
1577 signature='sv')),
1578 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1579 dbus.Dictionary({ 'ssid': dbus.ByteArray(' ') },
1580 signature='sv')) ]
1581 for (count,funcs,args) in tests:
1582 with alloc_fail_dbus(dev[0], count, funcs, "AddNetwork", "InvalidArgs"):
1583 netw = iface.AddNetwork(args)
1584
1585 if len(if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
1586 dbus_interface=dbus.PROPERTIES_IFACE)) > 0:
1587 raise Exception("Unexpected network block added")
1588 if len(dev[0].list_networks()) > 0:
1589 raise Exception("Unexpected network block visible")
1590
2ec82e67
JM
1591def test_dbus_interface(dev, apdev):
1592 """D-Bus CreateInterface/GetInterface/RemoveInterface parameters and error cases"""
910eb5b5 1593 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1594 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
1595
1596 params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none' },
1597 signature='sv')
1598 path = wpas.CreateInterface(params)
1599 logger.debug("New interface path: " + str(path))
1600 path2 = wpas.GetInterface("lo")
1601 if path != path2:
1602 raise Exception("Interface object mismatch")
1603
1604 params = dbus.Dictionary({ 'Ifname': 'lo',
1605 'Driver': 'none',
1606 'ConfigFile': 'foo',
1607 'BridgeIfname': 'foo', },
1608 signature='sv')
1609 try:
1610 wpas.CreateInterface(params)
1611 raise Exception("Invalid CreateInterface() accepted")
1612 except dbus.exceptions.DBusException, e:
1613 if "InterfaceExists" not in str(e):
1614 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e))
1615
1616 wpas.RemoveInterface(path)
1617 try:
1618 wpas.RemoveInterface(path)
1619 raise Exception("Invalid RemoveInterface() accepted")
1620 except dbus.exceptions.DBusException, e:
1621 if "InterfaceUnknown" not in str(e):
1622 raise Exception("Unexpected error message for invalid RemoveInterface: " + str(e))
1623
1624 params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none',
1625 'Foo': 123 },
1626 signature='sv')
1627 try:
1628 wpas.CreateInterface(params)
1629 raise Exception("Invalid CreateInterface() accepted")
1630 except dbus.exceptions.DBusException, e:
1631 if "InvalidArgs" not in str(e):
1632 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e))
1633
1634 params = dbus.Dictionary({ 'Driver': 'none' }, signature='sv')
1635 try:
1636 wpas.CreateInterface(params)
1637 raise Exception("Invalid CreateInterface() accepted")
1638 except dbus.exceptions.DBusException, e:
1639 if "InvalidArgs" not in str(e):
1640 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e))
1641
1642 try:
1643 wpas.GetInterface("lo")
1644 raise Exception("Invalid GetInterface() accepted")
1645 except dbus.exceptions.DBusException, e:
1646 if "InterfaceUnknown" not in str(e):
1647 raise Exception("Unexpected error message for invalid RemoveInterface: " + str(e))
1648
0e126c6d
JM
1649def test_dbus_interface_oom(dev, apdev):
1650 """D-Bus CreateInterface/GetInterface/RemoveInterface OOM error cases"""
1651 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1652 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
1653
1654 with alloc_fail_dbus(dev[0], 1, "wpa_dbus_dict_get_entry;wpas_dbus_handler_create_interface", "CreateInterface", "InvalidArgs"):
1655 params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none' },
1656 signature='sv')
1657 wpas.CreateInterface(params)
1658
1659 for i in range(1, 1000):
1660 dev[0].request("TEST_ALLOC_FAIL %d:wpa_supplicant_add_iface;wpas_dbus_handler_create_interface" % i)
1661 params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none' },
1662 signature='sv')
1663 try:
1664 npath = wpas.CreateInterface(params)
1665 wpas.RemoveInterface(npath)
1666 logger.info("CreateInterface succeeds after %d allocation failures" % i)
1667 state = dev[0].request('GET_ALLOC_FAIL')
1668 logger.info("GET_ALLOC_FAIL: " + state)
1669 dev[0].dump_monitor()
1670 dev[0].request("TEST_ALLOC_FAIL 0:")
1671 if i < 5:
1672 raise Exception("CreateInterface succeeded during out-of-memory")
1673 if not state.startswith('0:'):
1674 break
1675 except dbus.exceptions.DBusException, e:
1676 pass
1677
1678 for arg in [ 'Driver', 'Ifname', 'ConfigFile', 'BridgeIfname' ]:
1679 with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_handler_create_interface",
1680 "CreateInterface"):
1681 params = dbus.Dictionary({ arg: 'foo' }, signature='sv')
1682 wpas.CreateInterface(params)
1683
2ec82e67
JM
1684def test_dbus_blob(dev, apdev):
1685 """D-Bus AddNetwork/RemoveNetwork parameters and error cases"""
910eb5b5 1686 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1687 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1688
1689 blob = dbus.ByteArray("\x01\x02\x03")
1690 iface.AddBlob('blob1', blob)
1691 try:
1692 iface.AddBlob('blob1', dbus.ByteArray("\x01\x02\x04"))
1693 raise Exception("Invalid AddBlob() accepted")
1694 except dbus.exceptions.DBusException, e:
1695 if "BlobExists" not in str(e):
1696 raise Exception("Unexpected error message for invalid AddBlob: " + str(e))
1697 res = iface.GetBlob('blob1')
1698 if len(res) != len(blob):
1699 raise Exception("Unexpected blob data length")
1700 for i in range(len(res)):
1701 if res[i] != dbus.Byte(blob[i]):
1702 raise Exception("Unexpected blob data")
1703 res = if_obj.Get(WPAS_DBUS_IFACE, "Blobs",
1704 dbus_interface=dbus.PROPERTIES_IFACE)
1705 if 'blob1' not in res:
1706 raise Exception("Added blob missing from Blobs property")
1707 iface.RemoveBlob('blob1')
1708 try:
1709 iface.RemoveBlob('blob1')
1710 raise Exception("Invalid RemoveBlob() accepted")
1711 except dbus.exceptions.DBusException, e:
1712 if "BlobUnknown" not in str(e):
1713 raise Exception("Unexpected error message for invalid RemoveBlob: " + str(e))
1714 try:
1715 iface.GetBlob('blob1')
1716 raise Exception("Invalid GetBlob() accepted")
1717 except dbus.exceptions.DBusException, e:
1718 if "BlobUnknown" not in str(e):
1719 raise Exception("Unexpected error message for invalid GetBlob: " + str(e))
1720
1721 class TestDbusBlob(TestDbus):
1722 def __init__(self, bus):
1723 TestDbus.__init__(self, bus)
1724 self.blob_added = False
1725 self.blob_removed = False
1726
1727 def __enter__(self):
1728 gobject.timeout_add(1, self.run_blob)
1729 gobject.timeout_add(15000, self.timeout)
1730 self.add_signal(self.blobAdded, WPAS_DBUS_IFACE, "BlobAdded")
1731 self.add_signal(self.blobRemoved, WPAS_DBUS_IFACE, "BlobRemoved")
1732 self.loop.run()
1733 return self
1734
1735 def blobAdded(self, blobName):
1736 logger.debug("blobAdded: %s" % blobName)
1737 if blobName == 'blob2':
1738 self.blob_added = True
1739
1740 def blobRemoved(self, blobName):
1741 logger.debug("blobRemoved: %s" % blobName)
1742 if blobName == 'blob2':
1743 self.blob_removed = True
1744 self.loop.quit()
1745
1746 def run_blob(self, *args):
1747 logger.debug("run_blob")
1748 iface.AddBlob('blob2', dbus.ByteArray("\x01\x02\x04"))
1749 iface.RemoveBlob('blob2')
1750 return False
1751
1752 def success(self):
1753 return self.blob_added and self.blob_removed
1754
1755 with TestDbusBlob(bus) as t:
1756 if not t.success():
1757 raise Exception("Expected signals not seen")
1758
0e126c6d
JM
1759def test_dbus_blob_oom(dev, apdev):
1760 """D-Bus AddNetwork/RemoveNetwork OOM error cases"""
1761 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1762 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1763
1764 for i in range(1, 4):
1765 with alloc_fail_dbus(dev[0], i, "wpas_dbus_handler_add_blob",
1766 "AddBlob"):
1767 iface.AddBlob('blob_no_mem', dbus.ByteArray("\x01\x02\x03\x04"))
1768
2ec82e67
JM
1769def test_dbus_autoscan(dev, apdev):
1770 """D-Bus Autoscan()"""
910eb5b5 1771 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1772 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1773
1774 iface.AutoScan("foo")
1775 iface.AutoScan("periodic:1")
1776 iface.AutoScan("")
1777 dev[0].request("AUTOSCAN ")
1778
0e126c6d
JM
1779def test_dbus_autoscan_oom(dev, apdev):
1780 """D-Bus Autoscan() OOM"""
1781 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1782 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1783
1784 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_handler_autoscan", "AutoScan"):
1785 iface.AutoScan("foo")
1786 dev[0].request("AUTOSCAN ")
1787
2ec82e67
JM
1788def test_dbus_tdls_invalid(dev, apdev):
1789 """D-Bus invalid TDLS operations"""
910eb5b5 1790 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1791 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1792
1793 hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "test-open" })
1794 connect_2sta_open(dev, hapd)
1795 addr1 = dev[1].p2p_interface_addr()
1796
1797 try:
1798 iface.TDLSDiscover("foo")
1799 raise Exception("Invalid TDLSDiscover() accepted")
1800 except dbus.exceptions.DBusException, e:
1801 if "InvalidArgs" not in str(e):
1802 raise Exception("Unexpected error message for invalid TDLSDiscover: " + str(e))
1803
1804 try:
1805 iface.TDLSStatus("foo")
1806 raise Exception("Invalid TDLSStatus() accepted")
1807 except dbus.exceptions.DBusException, e:
1808 if "InvalidArgs" not in str(e):
1809 raise Exception("Unexpected error message for invalid TDLSStatus: " + str(e))
1810
1811 res = iface.TDLSStatus(addr1)
1812 if res != "peer does not exist":
1813 raise Exception("Unexpected TDLSStatus response")
1814
1815 try:
1816 iface.TDLSSetup("foo")
1817 raise Exception("Invalid TDLSSetup() accepted")
1818 except dbus.exceptions.DBusException, e:
1819 if "InvalidArgs" not in str(e):
1820 raise Exception("Unexpected error message for invalid TDLSSetup: " + str(e))
1821
1822 try:
1823 iface.TDLSTeardown("foo")
1824 raise Exception("Invalid TDLSTeardown() accepted")
1825 except dbus.exceptions.DBusException, e:
1826 if "InvalidArgs" not in str(e):
1827 raise Exception("Unexpected error message for invalid TDLSTeardown: " + str(e))
1828
795b6f57
JM
1829 try:
1830 iface.TDLSTeardown("00:11:22:33:44:55")
1831 raise Exception("TDLSTeardown accepted for unknown peer")
1832 except dbus.exceptions.DBusException, e:
1833 if "UnknownError: error performing TDLS teardown" not in str(e):
1834 raise Exception("Unexpected error message: " + str(e))
1835
0e126c6d
JM
1836def test_dbus_tdls_oom(dev, apdev):
1837 """D-Bus TDLS operations during OOM"""
1838 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1839 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1840
1841 with alloc_fail_dbus(dev[0], 1, "wpa_tdls_add_peer", "TDLSSetup",
1842 "UnknownError: error performing TDLS setup"):
1843 iface.TDLSSetup("00:11:22:33:44:55")
1844
2ec82e67
JM
1845def test_dbus_tdls(dev, apdev):
1846 """D-Bus TDLS"""
910eb5b5 1847 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1848 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1849
1850 hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "test-open" })
1851 connect_2sta_open(dev, hapd)
1852
1853 addr1 = dev[1].p2p_interface_addr()
1854
1855 class TestDbusTdls(TestDbus):
1856 def __init__(self, bus):
1857 TestDbus.__init__(self, bus)
1858 self.tdls_setup = False
1859 self.tdls_teardown = False
1860
1861 def __enter__(self):
1862 gobject.timeout_add(1, self.run_tdls)
1863 gobject.timeout_add(15000, self.timeout)
1864 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1865 "PropertiesChanged")
1866 self.loop.run()
1867 return self
1868
1869 def propertiesChanged(self, properties):
1870 logger.debug("propertiesChanged: %s" % str(properties))
1871
1872 def run_tdls(self, *args):
1873 logger.debug("run_tdls")
1874 iface.TDLSDiscover(addr1)
1875 gobject.timeout_add(100, self.run_tdls2)
1876 return False
1877
1878 def run_tdls2(self, *args):
1879 logger.debug("run_tdls2")
1880 iface.TDLSSetup(addr1)
1881 gobject.timeout_add(500, self.run_tdls3)
1882 return False
1883
1884 def run_tdls3(self, *args):
1885 logger.debug("run_tdls3")
1886 res = iface.TDLSStatus(addr1)
1887 if res == "connected":
1888 self.tdls_setup = True
1889 else:
1890 logger.info("Unexpected TDLSStatus: " + res)
1891 iface.TDLSTeardown(addr1)
1892 gobject.timeout_add(200, self.run_tdls4)
1893 return False
1894
1895 def run_tdls4(self, *args):
1896 logger.debug("run_tdls4")
1897 res = iface.TDLSStatus(addr1)
1898 if res == "peer does not exist":
1899 self.tdls_teardown = True
1900 else:
1901 logger.info("Unexpected TDLSStatus: " + res)
1902 self.loop.quit()
1903 return False
1904
1905 def success(self):
1906 return self.tdls_setup and self.tdls_teardown
1907
1908 with TestDbusTdls(bus) as t:
1909 if not t.success():
1910 raise Exception("Expected signals not seen")
1911
1912def test_dbus_pkcs11(dev, apdev):
1913 """D-Bus SetPKCS11EngineAndModulePath()"""
910eb5b5 1914 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1915 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1916
1917 try:
1918 iface.SetPKCS11EngineAndModulePath("foo", "bar")
1919 except dbus.exceptions.DBusException, e:
1920 if "Error.Failed: Reinit of the EAPOL" not in str(e):
1921 raise Exception("Unexpected error message for invalid SetPKCS11EngineAndModulePath: " + str(e))
1922
1923 try:
1924 iface.SetPKCS11EngineAndModulePath("foo", "")
1925 except dbus.exceptions.DBusException, e:
1926 if "Error.Failed: Reinit of the EAPOL" not in str(e):
1927 raise Exception("Unexpected error message for invalid SetPKCS11EngineAndModulePath: " + str(e))
1928
1929 iface.SetPKCS11EngineAndModulePath("", "bar")
1930 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11EnginePath",
1931 dbus_interface=dbus.PROPERTIES_IFACE)
1932 if res != "":
1933 raise Exception("Unexpected PKCS11EnginePath value: " + res)
1934 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11ModulePath",
1935 dbus_interface=dbus.PROPERTIES_IFACE)
1936 if res != "bar":
1937 raise Exception("Unexpected PKCS11ModulePath value: " + res)
1938
1939 iface.SetPKCS11EngineAndModulePath("", "")
1940 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11EnginePath",
1941 dbus_interface=dbus.PROPERTIES_IFACE)
1942 if res != "":
1943 raise Exception("Unexpected PKCS11EnginePath value: " + res)
1944 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11ModulePath",
1945 dbus_interface=dbus.PROPERTIES_IFACE)
1946 if res != "":
1947 raise Exception("Unexpected PKCS11ModulePath value: " + res)
1948
1949def test_dbus_apscan(dev, apdev):
1950 """D-Bus Get/Set ApScan"""
1951 try:
81e787b7 1952 _test_dbus_apscan(dev, apdev)
2ec82e67
JM
1953 finally:
1954 dev[0].request("AP_SCAN 1")
1955
1956def _test_dbus_apscan(dev, apdev):
910eb5b5 1957 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1958
1959 res = if_obj.Get(WPAS_DBUS_IFACE, "ApScan",
1960 dbus_interface=dbus.PROPERTIES_IFACE)
1961 if res != 1:
1962 raise Exception("Unexpected initial ApScan value: %d" % res)
1963
1964 for i in range(3):
1965 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(i),
1966 dbus_interface=dbus.PROPERTIES_IFACE)
1967 res = if_obj.Get(WPAS_DBUS_IFACE, "ApScan",
1968 dbus_interface=dbus.PROPERTIES_IFACE)
1969 if res != i:
1970 raise Exception("Unexpected ApScan value %d (expected %d)" % (res, i))
1971
1972 try:
1973 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.Int16(-1),
1974 dbus_interface=dbus.PROPERTIES_IFACE)
1975 raise Exception("Invalid Set(ApScan,-1) accepted")
1976 except dbus.exceptions.DBusException, e:
1977 if "Error.Failed: wrong property type" not in str(e):
1978 raise Exception("Unexpected error message for invalid Set(ApScan,-1): " + str(e))
1979
1980 try:
1981 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(123),
1982 dbus_interface=dbus.PROPERTIES_IFACE)
1983 raise Exception("Invalid Set(ApScan,123) accepted")
1984 except dbus.exceptions.DBusException, e:
1985 if "Error.Failed: ap_scan must be 0, 1, or 2" not in str(e):
1986 raise Exception("Unexpected error message for invalid Set(ApScan,123): " + str(e))
1987
1988 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(1),
1989 dbus_interface=dbus.PROPERTIES_IFACE)
1990
1991def test_dbus_fastreauth(dev, apdev):
1992 """D-Bus Get/Set FastReauth"""
910eb5b5 1993 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1994
1995 res = if_obj.Get(WPAS_DBUS_IFACE, "FastReauth",
1996 dbus_interface=dbus.PROPERTIES_IFACE)
1997 if res != True:
1998 raise Exception("Unexpected initial FastReauth value: " + str(res))
1999
2000 for i in [ False, True ]:
2001 if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Boolean(i),
2002 dbus_interface=dbus.PROPERTIES_IFACE)
2003 res = if_obj.Get(WPAS_DBUS_IFACE, "FastReauth",
2004 dbus_interface=dbus.PROPERTIES_IFACE)
2005 if res != i:
2006 raise Exception("Unexpected FastReauth value %d (expected %d)" % (res, i))
2007
2008 try:
2009 if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Int16(-1),
2010 dbus_interface=dbus.PROPERTIES_IFACE)
2011 raise Exception("Invalid Set(FastReauth,-1) accepted")
2012 except dbus.exceptions.DBusException, e:
2013 if "Error.Failed: wrong property type" not in str(e):
2014 raise Exception("Unexpected error message for invalid Set(ApScan,-1): " + str(e))
2015
2016 if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Boolean(True),
2017 dbus_interface=dbus.PROPERTIES_IFACE)
2018
2019def test_dbus_bss_expire(dev, apdev):
2020 """D-Bus Get/Set BSSExpireAge and BSSExpireCount"""
910eb5b5 2021 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2022
2023 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(179),
2024 dbus_interface=dbus.PROPERTIES_IFACE)
2025 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSExpireAge",
2026 dbus_interface=dbus.PROPERTIES_IFACE)
2027 if res != 179:
2028 raise Exception("Unexpected BSSExpireAge value %d (expected %d)" % (res, i))
2029
2030 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(3),
2031 dbus_interface=dbus.PROPERTIES_IFACE)
2032 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSExpireCount",
2033 dbus_interface=dbus.PROPERTIES_IFACE)
2034 if res != 3:
2035 raise Exception("Unexpected BSSExpireCount value %d (expected %d)" % (res, i))
2036
2037 try:
2038 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.Int16(-1),
2039 dbus_interface=dbus.PROPERTIES_IFACE)
2040 raise Exception("Invalid Set(BSSExpireAge,-1) accepted")
2041 except dbus.exceptions.DBusException, e:
2042 if "Error.Failed: wrong property type" not in str(e):
2043 raise Exception("Unexpected error message for invalid Set(BSSExpireAge,-1): " + str(e))
2044
2045 try:
2046 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(9),
2047 dbus_interface=dbus.PROPERTIES_IFACE)
2048 raise Exception("Invalid Set(BSSExpireAge,9) accepted")
2049 except dbus.exceptions.DBusException, e:
2050 if "Error.Failed: BSSExpireAge must be >= 10" not in str(e):
2051 raise Exception("Unexpected error message for invalid Set(BSSExpireAge,9): " + str(e))
2052
2053 try:
2054 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.Int16(-1),
2055 dbus_interface=dbus.PROPERTIES_IFACE)
2056 raise Exception("Invalid Set(BSSExpireCount,-1) accepted")
2057 except dbus.exceptions.DBusException, e:
2058 if "Error.Failed: wrong property type" not in str(e):
2059 raise Exception("Unexpected error message for invalid Set(BSSExpireCount,-1): " + str(e))
2060
2061 try:
2062 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(0),
2063 dbus_interface=dbus.PROPERTIES_IFACE)
2064 raise Exception("Invalid Set(BSSExpireCount,0) accepted")
2065 except dbus.exceptions.DBusException, e:
2066 if "Error.Failed: BSSExpireCount must be > 0" not in str(e):
2067 raise Exception("Unexpected error message for invalid Set(BSSExpireCount,0): " + str(e))
2068
2069 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(180),
2070 dbus_interface=dbus.PROPERTIES_IFACE)
2071 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(2),
2072 dbus_interface=dbus.PROPERTIES_IFACE)
2073
2074def test_dbus_country(dev, apdev):
2075 """D-Bus Get/Set Country"""
2076 try:
81e787b7 2077 _test_dbus_country(dev, apdev)
2ec82e67
JM
2078 finally:
2079 dev[0].request("SET country 00")
2080 subprocess.call(['iw', 'reg', 'set', '00'])
2081
2082def _test_dbus_country(dev, apdev):
910eb5b5 2083 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2084
2085 # work around issues with possible pending regdom event from the end of
2086 # the previous test case
2087 time.sleep(0.2)
2088 dev[0].dump_monitor()
2089
2090 if_obj.Set(WPAS_DBUS_IFACE, "Country", "FI",
2091 dbus_interface=dbus.PROPERTIES_IFACE)
2092 res = if_obj.Get(WPAS_DBUS_IFACE, "Country",
2093 dbus_interface=dbus.PROPERTIES_IFACE)
2094 if res != "FI":
2095 raise Exception("Unexpected Country value %s (expected FI)" % res)
2096
2097 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"])
2098 if ev is None:
2099 raise Exception("regdom change event not seen")
2100 if "init=USER type=COUNTRY alpha2=FI" not in ev:
2101 raise Exception("Unexpected event contents: " + ev)
2102
2103 try:
2104 if_obj.Set(WPAS_DBUS_IFACE, "Country", dbus.Int16(-1),
2105 dbus_interface=dbus.PROPERTIES_IFACE)
2106 raise Exception("Invalid Set(Country,-1) accepted")
2107 except dbus.exceptions.DBusException, e:
2108 if "Error.Failed: wrong property type" not in str(e):
2109 raise Exception("Unexpected error message for invalid Set(Country,-1): " + str(e))
2110
2111 try:
2112 if_obj.Set(WPAS_DBUS_IFACE, "Country", "F",
2113 dbus_interface=dbus.PROPERTIES_IFACE)
2114 raise Exception("Invalid Set(Country,F) accepted")
2115 except dbus.exceptions.DBusException, e:
2116 if "Error.Failed: invalid country code" not in str(e):
2117 raise Exception("Unexpected error message for invalid Set(Country,F): " + str(e))
2118
2119 if_obj.Set(WPAS_DBUS_IFACE, "Country", "00",
2120 dbus_interface=dbus.PROPERTIES_IFACE)
2121
2122 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"])
2123 if ev is None:
2124 raise Exception("regdom change event not seen")
2125 if "init=CORE type=WORLD" not in ev:
2126 raise Exception("Unexpected event contents: " + ev)
2127
2128def test_dbus_scan_interval(dev, apdev):
2129 """D-Bus Get/Set ScanInterval"""
2130 try:
2131 _test_dbus_scan_interval(dev, apdev)
2132 finally:
2133 dev[0].request("SCAN_INTERVAL 5")
2134
2135def _test_dbus_scan_interval(dev, apdev):
910eb5b5 2136 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2137
2138 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(3),
2139 dbus_interface=dbus.PROPERTIES_IFACE)
2140 res = if_obj.Get(WPAS_DBUS_IFACE, "ScanInterval",
2141 dbus_interface=dbus.PROPERTIES_IFACE)
2142 if res != 3:
2143 raise Exception("Unexpected ScanInterval value %d (expected %d)" % (res, i))
2144
2145 try:
2146 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.UInt16(100),
2147 dbus_interface=dbus.PROPERTIES_IFACE)
2148 raise Exception("Invalid Set(ScanInterval,100) accepted")
2149 except dbus.exceptions.DBusException, e:
2150 if "Error.Failed: wrong property type" not in str(e):
2151 raise Exception("Unexpected error message for invalid Set(ScanInterval,100): " + str(e))
2152
2153 try:
2154 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(-1),
2155 dbus_interface=dbus.PROPERTIES_IFACE)
2156 raise Exception("Invalid Set(ScanInterval,-1) accepted")
2157 except dbus.exceptions.DBusException, e:
2158 if "Error.Failed: scan_interval must be >= 0" not in str(e):
2159 raise Exception("Unexpected error message for invalid Set(ScanInterval,-1): " + str(e))
2160
2161 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(5),
2162 dbus_interface=dbus.PROPERTIES_IFACE)
2163
2164def test_dbus_probe_req_reporting(dev, apdev):
2165 """D-Bus Probe Request reporting"""
910eb5b5 2166 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2167 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2168
2169 dev[0].p2p_start_go(freq=2412)
2170 dev[1].p2p_find(social=True)
2171
2172 class TestDbusProbe(TestDbus):
2173 def __init__(self, bus):
2174 TestDbus.__init__(self, bus)
2175 self.reported = False
2176
2177 def __enter__(self):
2178 gobject.timeout_add(1, self.run_test)
2179 gobject.timeout_add(15000, self.timeout)
2180 self.add_signal(self.probeRequest, WPAS_DBUS_IFACE, "ProbeRequest",
2181 byte_arrays=True)
2182 iface.SubscribeProbeReq()
2183 self.loop.run()
2184 return self
2185
2186 def probeRequest(self, args):
2187 logger.debug("probeRequest: args=%s" % str(args))
2188 self.reported = True
2189 self.loop.quit()
2190
2191 def run_test(self, *args):
2192 logger.debug("run_test")
2193 return False
2194
2195 def success(self):
2196 return self.reported
2197
2198 with TestDbusProbe(bus) as t:
2199 if not t.success():
2200 raise Exception("Expected signals not seen")
2201 iface.UnsubscribeProbeReq()
2202
2203 try:
2204 iface.UnsubscribeProbeReq()
2205 raise Exception("Invalid UnsubscribeProbeReq() accepted")
2206 except dbus.exceptions.DBusException, e:
2207 if "NoSubscription" not in str(e):
2208 raise Exception("Unexpected error message for invalid UnsubscribeProbeReq(): " + str(e))
2209
2210 with TestDbusProbe(bus) as t:
2211 if not t.success():
2212 raise Exception("Expected signals not seen")
2213 # On purpose, leave ProbeReq subscription in place to test automatic
2214 # cleanup.
2215
2216 dev[1].p2p_stop_find()
2217 dev[0].remove_group()
2218
0e126c6d
JM
2219def test_dbus_probe_req_reporting_oom(dev, apdev):
2220 """D-Bus Probe Request reporting (OOM)"""
2221 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2222 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2223
fb9adae4
JM
2224 # Need to make sure this process has not already subscribed to avoid false
2225 # failures due to the operation succeeding due to os_strdup() not even
2226 # getting called.
2227 try:
2228 iface.UnsubscribeProbeReq()
2229 was_subscribed = True
2230 except dbus.exceptions.DBusException, e:
2231 was_subscribed = False
2232 pass
2233
0e126c6d
JM
2234 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_handler_subscribe_preq",
2235 "SubscribeProbeReq"):
2236 iface.SubscribeProbeReq()
2237
fb9adae4
JM
2238 if was_subscribed:
2239 # On purpose, leave ProbeReq subscription in place to test automatic
2240 # cleanup.
2241 iface.SubscribeProbeReq()
2242
2ec82e67
JM
2243def test_dbus_p2p_invalid(dev, apdev):
2244 """D-Bus invalid P2P operations"""
910eb5b5 2245 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2246 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2247
2248 try:
2249 p2p.RejectPeer(path + "/Peers/00112233445566")
2250 raise Exception("Invalid RejectPeer accepted")
2251 except dbus.exceptions.DBusException, e:
2252 if "UnknownError: Failed to call wpas_p2p_reject" not in str(e):
2253 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e))
2254
2255 try:
2256 p2p.RejectPeer("/foo")
2257 raise Exception("Invalid RejectPeer accepted")
2258 except dbus.exceptions.DBusException, e:
2259 if "InvalidArgs" not in str(e):
2260 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e))
2261
2262 tests = [ {'DiscoveryType': 'foo'},
2263 {'RequestedDeviceTypes': 'foo'},
2264 {'RequestedDeviceTypes': ['foo']},
2265 {'RequestedDeviceTypes': ['1','2','3','4','5','6','7','8','9',
2266 '10','11','12','13','14','15','16',
2267 '17']},
2268 {'RequestedDeviceTypes': dbus.Array([], signature="s")},
2269 {'RequestedDeviceTypes': dbus.Array([['foo']], signature="as")},
2270 {'RequestedDeviceTypes': dbus.Array([], signature="i")},
2271 {'RequestedDeviceTypes': [dbus.ByteArray('12345678'),
2272 dbus.ByteArray('1234567')]},
2273 {'Foo': dbus.Int16(1)},
2274 {'Foo': dbus.UInt16(1)},
2275 {'Foo': dbus.Int64(1)},
2276 {'Foo': dbus.UInt64(1)},
2277 {'Foo': dbus.Double(1.23)},
2278 {'Foo': dbus.Signature('s')},
2279 {'Foo': 'bar'}]
2280 for t in tests:
2281 try:
2282 p2p.Find(dbus.Dictionary(t))
2283 raise Exception("Invalid Find accepted")
2284 except dbus.exceptions.DBusException, e:
2285 if "InvalidArgs" not in str(e):
2286 raise Exception("Unexpected error message for invalid Find(): " + str(e))
2287
2288 for p in [ "/foo",
2289 "/fi/w1/wpa_supplicant1/Interfaces/1234",
2290 "/fi/w1/wpa_supplicant1/Interfaces/1234/Networks/1234" ]:
2291 try:
2292 p2p.RemovePersistentGroup(dbus.ObjectPath(p))
2293 raise Exception("Invalid RemovePersistentGroup accepted")
2294 except dbus.exceptions.DBusException, e:
2295 if "InvalidArgs" not in str(e):
2296 raise Exception("Unexpected error message for invalid RemovePersistentGroup: " + str(e))
2297
2298 try:
2299 dev[0].request("P2P_SET disabled 1")
2300 p2p.Listen(5)
2301 raise Exception("Invalid Listen accepted")
2302 except dbus.exceptions.DBusException, e:
2303 if "UnknownError: Could not start P2P listen" not in str(e):
2304 raise Exception("Unexpected error message for invalid Listen: " + str(e))
2305 finally:
2306 dev[0].request("P2P_SET disabled 0")
2307
2308 test_obj = bus.get_object(WPAS_DBUS_SERVICE, path, introspect=False)
2309 test_p2p = dbus.Interface(test_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2310 try:
2311 test_p2p.Listen("foo")
2312 raise Exception("Invalid Listen accepted")
2313 except dbus.exceptions.DBusException, e:
2314 if "InvalidArgs" not in str(e):
2315 raise Exception("Unexpected error message for invalid Listen: " + str(e))
2316
2317 try:
2318 dev[0].request("P2P_SET disabled 1")
2319 p2p.ExtendedListen(dbus.Dictionary({}))
2320 raise Exception("Invalid ExtendedListen accepted")
2321 except dbus.exceptions.DBusException, e:
2322 if "UnknownError: failed to initiate a p2p_ext_listen" not in str(e):
2323 raise Exception("Unexpected error message for invalid ExtendedListen: " + str(e))
2324 finally:
2325 dev[0].request("P2P_SET disabled 0")
2326
2327 try:
2328 dev[0].request("P2P_SET disabled 1")
2329 args = { 'duration1': 30000, 'interval1': 102400,
2330 'duration2': 20000, 'interval2': 102400 }
2331 p2p.PresenceRequest(args)
2332 raise Exception("Invalid PresenceRequest accepted")
2333 except dbus.exceptions.DBusException, e:
2334 if "UnknownError: Failed to invoke presence request" not in str(e):
2335 raise Exception("Unexpected error message for invalid PresenceRequest: " + str(e))
2336 finally:
2337 dev[0].request("P2P_SET disabled 0")
2338
2339 try:
2340 params = dbus.Dictionary({'frequency': dbus.Int32(-1)})
2341 p2p.GroupAdd(params)
2342 raise Exception("Invalid GroupAdd accepted")
2343 except dbus.exceptions.DBusException, e:
2344 if "InvalidArgs" not in str(e):
2345 raise Exception("Unexpected error message for invalid GroupAdd: " + str(e))
2346
2347 try:
2348 params = dbus.Dictionary({'persistent_group_object':
2349 dbus.ObjectPath(path),
2350 'frequency': 2412})
2351 p2p.GroupAdd(params)
2352 raise Exception("Invalid GroupAdd accepted")
2353 except dbus.exceptions.DBusException, e:
2354 if "InvalidArgs" not in str(e):
2355 raise Exception("Unexpected error message for invalid GroupAdd: " + str(e))
2356
2357 try:
2358 p2p.Disconnect()
2359 raise Exception("Invalid Disconnect accepted")
2360 except dbus.exceptions.DBusException, e:
2361 if "UnknownError: failed to disconnect" not in str(e):
2362 raise Exception("Unexpected error message for invalid Disconnect: " + str(e))
2363
2364 try:
2365 dev[0].request("P2P_SET disabled 1")
2366 p2p.Flush()
2367 raise Exception("Invalid Flush accepted")
2368 except dbus.exceptions.DBusException, e:
2369 if "Error.Failed: P2P is not available for this interface" not in str(e):
2370 raise Exception("Unexpected error message for invalid Flush: " + str(e))
2371 finally:
2372 dev[0].request("P2P_SET disabled 0")
2373
2374 try:
2375 dev[0].request("P2P_SET disabled 1")
2376 args = { 'peer': path,
2377 'join': True,
2378 'wps_method': 'pbc',
2379 'frequency': 2412 }
2380 pin = p2p.Connect(args)
2381 raise Exception("Invalid Connect accepted")
2382 except dbus.exceptions.DBusException, e:
2383 if "Error.Failed: P2P is not available for this interface" not in str(e):
2384 raise Exception("Unexpected error message for invalid Connect: " + str(e))
2385 finally:
2386 dev[0].request("P2P_SET disabled 0")
2387
2388 tests = [ { 'frequency': dbus.Int32(-1) },
2389 { 'wps_method': 'pbc' },
2390 { 'wps_method': 'foo' } ]
2391 for args in tests:
2392 try:
2393 pin = p2p.Connect(args)
2394 raise Exception("Invalid Connect accepted")
2395 except dbus.exceptions.DBusException, e:
2396 if "InvalidArgs" not in str(e):
2397 raise Exception("Unexpected error message for invalid Connect: " + str(e))
2398
2399 try:
2400 dev[0].request("P2P_SET disabled 1")
2401 args = { 'peer': path }
2402 pin = p2p.Invite(args)
2403 raise Exception("Invalid Invite accepted")
2404 except dbus.exceptions.DBusException, e:
2405 if "Error.Failed: P2P is not available for this interface" not in str(e):
2406 raise Exception("Unexpected error message for invalid Invite: " + str(e))
2407 finally:
2408 dev[0].request("P2P_SET disabled 0")
2409
2410 try:
2411 args = { 'foo': 'bar' }
2412 pin = p2p.Invite(args)
2413 raise Exception("Invalid Invite accepted")
2414 except dbus.exceptions.DBusException, e:
2415 if "InvalidArgs" not in str(e):
2416 raise Exception("Unexpected error message for invalid Connect: " + str(e))
2417
2418 tests = [ (path, 'display', "InvalidArgs"),
2419 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2420 'display',
2421 "UnknownError: Failed to send provision discovery request"),
2422 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2423 'keypad',
2424 "UnknownError: Failed to send provision discovery request"),
2425 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2426 'pbc',
2427 "UnknownError: Failed to send provision discovery request"),
2428 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2429 'pushbutton',
2430 "UnknownError: Failed to send provision discovery request"),
2431 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2432 'foo', "InvalidArgs") ]
2433 for (p,method,err) in tests:
2434 try:
2435 p2p.ProvisionDiscoveryRequest(p, method)
2436 raise Exception("Invalid ProvisionDiscoveryRequest accepted")
2437 except dbus.exceptions.DBusException, e:
2438 if err not in str(e):
2439 raise Exception("Unexpected error message for invalid ProvisionDiscoveryRequest: " + str(e))
2440
2441 try:
2442 dev[0].request("P2P_SET disabled 1")
2443 if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Peers",
2444 dbus_interface=dbus.PROPERTIES_IFACE)
2445 raise Exception("Invalid Get(Peers) accepted")
2446 except dbus.exceptions.DBusException, e:
2447 if "Error.Failed: P2P is not available for this interface" not in str(e):
2448 raise Exception("Unexpected error message for invalid Get(Peers): " + str(e))
2449 finally:
2450 dev[0].request("P2P_SET disabled 0")
2451
0e126c6d
JM
2452def test_dbus_p2p_oom(dev, apdev):
2453 """D-Bus P2P operations and OOM"""
2454 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2455 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2456
2457 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_string_array",
2458 "Find", "InvalidArgs"):
2459 p2p.Find(dbus.Dictionary({ 'Foo': [ 'bar' ] }))
2460
2461 with alloc_fail_dbus(dev[0], 2, "_wpa_dbus_dict_entry_get_string_array",
2462 "Find", "InvalidArgs"):
2463 p2p.Find(dbus.Dictionary({ 'Foo': [ 'bar' ] }))
2464
2465 with alloc_fail_dbus(dev[0], 10, "_wpa_dbus_dict_entry_get_string_array",
2466 "Find", "InvalidArgs"):
2467 p2p.Find(dbus.Dictionary({ 'Foo': [ '1','2','3','4','5','6','7','8','9' ] }))
2468
2469 with alloc_fail_dbus(dev[0], 1, ":=_wpa_dbus_dict_entry_get_binarray",
2470 "Find", "InvalidArgs"):
2471 p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123') ] }))
2472
2473 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_byte_array;_wpa_dbus_dict_entry_get_binarray",
2474 "Find", "InvalidArgs"):
2475 p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123') ] }))
2476
2477 with alloc_fail_dbus(dev[0], 2, "=_wpa_dbus_dict_entry_get_binarray",
2478 "Find", "InvalidArgs"):
2479 p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123'),
2480 dbus.ByteArray('123'),
2481 dbus.ByteArray('123'),
2482 dbus.ByteArray('123'),
2483 dbus.ByteArray('123'),
2484 dbus.ByteArray('123'),
2485 dbus.ByteArray('123'),
2486 dbus.ByteArray('123'),
2487 dbus.ByteArray('123'),
2488 dbus.ByteArray('123'),
2489 dbus.ByteArray('123') ] }))
2490
2491 with alloc_fail_dbus(dev[0], 1, "wpabuf_alloc_ext_data;_wpa_dbus_dict_entry_get_binarray",
2492 "Find", "InvalidArgs"):
2493 p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123') ] }))
2494
2495 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_fill_value_from_variant;wpas_dbus_handler_p2p_find",
2496 "Find", "InvalidArgs"):
2497 p2p.Find(dbus.Dictionary({ 'Foo': path }))
2498
2499 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_byte_array",
2500 "AddService", "InvalidArgs"):
2501 args = { 'service_type': 'bonjour',
2502 'response': dbus.ByteArray(500*'b') }
2503 p2p.AddService(args)
2504
2505 with alloc_fail_dbus(dev[0], 2, "_wpa_dbus_dict_entry_get_byte_array",
2506 "AddService", "InvalidArgs"):
2507 p2p.AddService(args)
2508
2ec82e67
JM
2509def test_dbus_p2p_discovery(dev, apdev):
2510 """D-Bus P2P discovery"""
910eb5b5 2511 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2512 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2513
2514 addr0 = dev[0].p2p_dev_addr()
2515
2516 dev[1].request("SET sec_device_type 1-0050F204-2")
2517 dev[1].request("VENDOR_ELEM_ADD 1 dd0c0050f2041049000411223344")
2518 dev[1].p2p_listen()
2519 addr1 = dev[1].p2p_dev_addr()
2520 a1 = binascii.unhexlify(addr1.replace(':',''))
2521
2522 wfd_devinfo = "00001c440028"
2523 dev[2].request("SET wifi_display 1")
2524 dev[2].request("WFD_SUBELEM_SET 0 0006" + wfd_devinfo)
2525 wfd = binascii.unhexlify('000006' + wfd_devinfo)
2526 dev[2].p2p_listen()
2527 addr2 = dev[2].p2p_dev_addr()
2528 a2 = binascii.unhexlify(addr2.replace(':',''))
2529
2530 res = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE,
2531 dbus_interface=dbus.PROPERTIES_IFACE)
2532 if 'Peers' not in res:
2533 raise Exception("GetAll result missing Peers")
2534 if len(res['Peers']) != 0:
2535 raise Exception("Unexpected peer(s) in the list")
2536
2537 args = {'DiscoveryType': 'social',
2538 'RequestedDeviceTypes': [dbus.ByteArray('12345678')],
2539 'Timeout': dbus.Int32(1) }
2540 p2p.Find(dbus.Dictionary(args))
2541 p2p.StopFind()
2542
2543 class TestDbusP2p(TestDbus):
2544 def __init__(self, bus):
2545 TestDbus.__init__(self, bus)
2546 self.found = False
2547 self.found2 = False
2548 self.lost = False
2549
2550 def __enter__(self):
2551 gobject.timeout_add(1, self.run_test)
2552 gobject.timeout_add(15000, self.timeout)
2553 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
2554 "DeviceFound")
2555 self.add_signal(self.deviceLost, WPAS_DBUS_IFACE_P2PDEVICE,
2556 "DeviceLost")
2557 self.add_signal(self.provisionDiscoveryResponseEnterPin,
2558 WPAS_DBUS_IFACE_P2PDEVICE,
2559 "ProvisionDiscoveryResponseEnterPin")
2560 self.loop.run()
2561 return self
2562
2563 def deviceFound(self, path):
2564 logger.debug("deviceFound: path=%s" % path)
2565 res = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Peers",
2566 dbus_interface=dbus.PROPERTIES_IFACE)
2567 if len(res) < 1:
2568 raise Exception("Unexpected number of peers")
2569 if path not in res:
2570 raise Exception("Mismatch in peer object path")
2571 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
2572 res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
2573 dbus_interface=dbus.PROPERTIES_IFACE,
2574 byte_arrays=True)
2575 logger.debug("peer properties: " + str(res))
2576
2577 if res['DeviceAddress'] == a1:
2578 if 'SecondaryDeviceTypes' not in res:
2579 raise Exception("Missing SecondaryDeviceTypes")
2580 sec = res['SecondaryDeviceTypes']
2581 if len(sec) < 1:
2582 raise Exception("Secondary device type missing")
2583 if "\x00\x01\x00\x50\xF2\x04\x00\x02" not in sec:
2584 raise Exception("Secondary device type mismatch")
2585
2586 if 'VendorExtension' not in res:
2587 raise Exception("Missing VendorExtension")
2588 vendor = res['VendorExtension']
2589 if len(vendor) < 1:
2590 raise Exception("Vendor extension missing")
2591 if "\x11\x22\x33\x44" not in vendor:
2592 raise Exception("Secondary device type mismatch")
2593
2594 self.found = True
2595 elif res['DeviceAddress'] == a2:
2596 if 'IEs' not in res:
2597 raise Exception("IEs missing")
2598 if res['IEs'] != wfd:
2599 raise Exception("IEs mismatch")
2600 self.found2 = True
2601 else:
2602 raise Exception("Unexpected peer device address")
2603
2604 if self.found and self.found2:
2605 p2p.StopFind()
2606 p2p.RejectPeer(path)
2607 p2p.ProvisionDiscoveryRequest(path, 'display')
2608
2609 def deviceLost(self, path):
2610 logger.debug("deviceLost: path=%s" % path)
2611 self.lost = True
2612 try:
2613 p2p.RejectPeer(path)
2614 raise Exception("Invalid RejectPeer accepted")
2615 except dbus.exceptions.DBusException, e:
2616 if "UnknownError: Failed to call wpas_p2p_reject" not in str(e):
2617 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e))
2618 self.loop.quit()
2619
2620 def provisionDiscoveryResponseEnterPin(self, peer_object):
2621 logger.debug("provisionDiscoveryResponseEnterPin - peer=%s" % peer_object)
2622 p2p.Flush()
2623
2624 def run_test(self, *args):
2625 logger.debug("run_test")
2626 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social',
2627 'Timeout': dbus.Int32(10)}))
2628 return False
2629
2630 def success(self):
2631 return self.found and self.lost and self.found2
2632
2633 with TestDbusP2p(bus) as t:
2634 if not t.success():
2635 raise Exception("Expected signals not seen")
2636
2637 dev[1].request("VENDOR_ELEM_REMOVE 1 *")
2638 dev[1].p2p_stop_find()
2639
2640 p2p.Listen(1)
2641 dev[2].p2p_stop_find()
2642 dev[2].request("P2P_FLUSH")
2643 if not dev[2].discover_peer(addr0):
2644 raise Exception("Peer not found")
2645 p2p.StopFind()
2646 dev[2].p2p_stop_find()
2647
2648 try:
2649 p2p.ExtendedListen(dbus.Dictionary({'foo': 100}))
2650 raise Exception("Invalid ExtendedListen accepted")
2651 except dbus.exceptions.DBusException, e:
2652 if "InvalidArgs" not in str(e):
2653 raise Exception("Unexpected error message for invalid ExtendedListen(): " + str(e))
2654
2655 p2p.ExtendedListen(dbus.Dictionary({'period': 100, 'interval': 1000}))
2656 p2p.ExtendedListen(dbus.Dictionary({}))
2657 dev[0].global_request("P2P_EXT_LISTEN")
2658
2659def test_dbus_p2p_service_discovery(dev, apdev):
2660 """D-Bus P2P service discovery"""
910eb5b5 2661 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2662 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2663
2664 addr0 = dev[0].p2p_dev_addr()
2665 addr1 = dev[1].p2p_dev_addr()
2666
2667 bonjour_query = dbus.ByteArray(binascii.unhexlify('0b5f6166706f766572746370c00c000c01'))
2668 bonjour_response = dbus.ByteArray(binascii.unhexlify('074578616d706c65c027'))
2669
2670 args = { 'service_type': 'bonjour',
2671 'query': bonjour_query,
2672 'response': bonjour_response }
2673 p2p.AddService(args)
2674 p2p.FlushService()
2675 p2p.AddService(args)
2676
2677 try:
2678 p2p.DeleteService(args)
2679 raise Exception("Invalid DeleteService() accepted")
2680 except dbus.exceptions.DBusException, e:
2681 if "InvalidArgs" not in str(e):
2682 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
2683
2684 args = { 'service_type': 'bonjour',
2685 'query': bonjour_query }
2686 p2p.DeleteService(args)
2687 try:
2688 p2p.DeleteService(args)
2689 raise Exception("Invalid DeleteService() accepted")
2690 except dbus.exceptions.DBusException, e:
2691 if "InvalidArgs" not in str(e):
2692 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
2693
2694 args = { 'service_type': 'upnp',
2695 'version': 0x10,
2696 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' }
2697 p2p.AddService(args)
2698 p2p.DeleteService(args)
2699 try:
2700 p2p.DeleteService(args)
2701 raise Exception("Invalid DeleteService() accepted")
2702 except dbus.exceptions.DBusException, e:
2703 if "InvalidArgs" not in str(e):
2704 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
2705
2706 tests = [ { 'service_type': 'foo' },
2707 { 'service_type': 'foo', 'query': bonjour_query },
2708 { 'service_type': 'upnp' },
2709 { 'service_type': 'upnp', 'version': 0x10 },
2710 { 'service_type': 'upnp',
2711 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
2712 { 'version': 0x10,
2713 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
2714 { 'service_type': 'upnp', 'foo': 'bar' },
2715 { 'service_type': 'bonjour' },
2716 { 'service_type': 'bonjour', 'query': 'foo' },
2717 { 'service_type': 'bonjour', 'foo': 'bar' } ]
2718 for args in tests:
2719 try:
2720 p2p.DeleteService(args)
2721 raise Exception("Invalid DeleteService() accepted")
2722 except dbus.exceptions.DBusException, e:
2723 if "InvalidArgs" not in str(e):
2724 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
2725
2726 tests = [ { 'service_type': 'foo' },
2727 { 'service_type': 'upnp' },
2728 { 'service_type': 'upnp', 'version': 0x10 },
2729 { 'service_type': 'upnp',
2730 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
2731 { 'version': 0x10,
2732 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
2733 { 'service_type': 'upnp', 'foo': 'bar' },
2734 { 'service_type': 'bonjour' },
2735 { 'service_type': 'bonjour', 'query': 'foo' },
2736 { 'service_type': 'bonjour', 'response': 'foo' },
2737 { 'service_type': 'bonjour', 'query': bonjour_query },
2738 { 'service_type': 'bonjour', 'response': bonjour_response },
2739 { 'service_type': 'bonjour', 'query': dbus.ByteArray(500*'a') },
2740 { 'service_type': 'bonjour', 'foo': 'bar' } ]
2741 for args in tests:
2742 try:
2743 p2p.AddService(args)
2744 raise Exception("Invalid AddService() accepted")
2745 except dbus.exceptions.DBusException, e:
2746 if "InvalidArgs" not in str(e):
2747 raise Exception("Unexpected error message for invalid AddService(): " + str(e))
2748
2749 args = { 'tlv': dbus.ByteArray("\x02\x00\x00\x01") }
2750 ref = p2p.ServiceDiscoveryRequest(args)
2751 p2p.ServiceDiscoveryCancelRequest(ref)
2752 try:
2753 p2p.ServiceDiscoveryCancelRequest(ref)
2754 raise Exception("Invalid ServiceDiscoveryCancelRequest() accepted")
2755 except dbus.exceptions.DBusException, e:
2756 if "InvalidArgs" not in str(e):
2757 raise Exception("Unexpected error message for invalid AddService(): " + str(e))
2758 try:
2759 p2p.ServiceDiscoveryCancelRequest(dbus.UInt64(0))
2760 raise Exception("Invalid ServiceDiscoveryCancelRequest() accepted")
2761 except dbus.exceptions.DBusException, e:
2762 if "InvalidArgs" not in str(e):
2763 raise Exception("Unexpected error message for invalid AddService(): " + str(e))
2764
2765 args = { 'service_type': 'upnp',
2766 'version': 0x10,
2767 'service': 'ssdp:foo' }
2768 ref = p2p.ServiceDiscoveryRequest(args)
2769 p2p.ServiceDiscoveryCancelRequest(ref)
2770
2771 tests = [ { 'service_type': 'foo' },
2772 { 'foo': 'bar' },
2773 { 'tlv': 'foo' },
2774 { },
2775 { 'version': 0 },
2776 { 'service_type': 'upnp',
2777 'service': 'ssdp:foo' },
2778 { 'service_type': 'upnp',
2779 'version': 0x10 },
2780 { 'service_type': 'upnp',
2781 'version': 0x10,
2782 'service': 'ssdp:foo',
2783 'peer_object': dbus.ObjectPath(path + "/Peers") },
2784 { 'service_type': 'upnp',
2785 'version': 0x10,
2786 'service': 'ssdp:foo',
2787 'peer_object': path + "/Peers" },
2788 { 'service_type': 'upnp',
2789 'version': 0x10,
2790 'service': 'ssdp:foo',
2791 'peer_object': dbus.ObjectPath(path + "/Peers/00112233445566") } ]
2792 for args in tests:
2793 try:
2794 p2p.ServiceDiscoveryRequest(args)
2795 raise Exception("Invalid ServiceDiscoveryRequest accepted")
2796 except dbus.exceptions.DBusException, e:
2797 if "InvalidArgs" not in str(e):
2798 raise Exception("Unexpected error message for invalid ServiceDiscoveryRequest(): " + str(e))
2799
2800 args = { 'foo': 'bar' }
2801 try:
2802 p2p.ServiceDiscoveryResponse(dbus.Dictionary(args, signature='sv'))
2803 raise Exception("Invalid ServiceDiscoveryResponse accepted")
2804 except dbus.exceptions.DBusException, e:
2805 if "InvalidArgs" not in str(e):
2806 raise Exception("Unexpected error message for invalid ServiceDiscoveryResponse(): " + str(e))
2807
2808def test_dbus_p2p_service_discovery_query(dev, apdev):
2809 """D-Bus P2P service discovery query"""
910eb5b5 2810 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2811 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2812
2813 addr0 = dev[0].p2p_dev_addr()
2814 dev[1].request("P2P_SERVICE_ADD bonjour 0b5f6166706f766572746370c00c000c01 074578616d706c65c027")
2815 dev[1].p2p_listen()
2816 addr1 = dev[1].p2p_dev_addr()
2817
2818 class TestDbusP2p(TestDbus):
2819 def __init__(self, bus):
2820 TestDbus.__init__(self, bus)
2821 self.done = False
2822
2823 def __enter__(self):
2824 gobject.timeout_add(1, self.run_test)
2825 gobject.timeout_add(15000, self.timeout)
2826 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
2827 "DeviceFound")
2828 self.add_signal(self.serviceDiscoveryResponse,
2829 WPAS_DBUS_IFACE_P2PDEVICE,
2830 "ServiceDiscoveryResponse", byte_arrays=True)
2831 self.loop.run()
2832 return self
2833
2834 def deviceFound(self, path):
2835 logger.debug("deviceFound: path=%s" % path)
2836 args = { 'peer_object': path,
2837 'tlv': dbus.ByteArray("\x02\x00\x00\x01") }
2838 p2p.ServiceDiscoveryRequest(args)
2839
2840 def serviceDiscoveryResponse(self, sd_request):
2841 logger.debug("serviceDiscoveryResponse: sd_request=%s" % str(sd_request))
2842 self.done = True
2843 self.loop.quit()
2844
2845 def run_test(self, *args):
2846 logger.debug("run_test")
2847 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social',
2848 'Timeout': dbus.Int32(10)}))
2849 return False
2850
2851 def success(self):
2852 return self.done
2853
2854 with TestDbusP2p(bus) as t:
2855 if not t.success():
2856 raise Exception("Expected signals not seen")
2857
2858 dev[1].p2p_stop_find()
2859
2860def test_dbus_p2p_service_discovery_external(dev, apdev):
2861 """D-Bus P2P service discovery with external response"""
2862 try:
2863 _test_dbus_p2p_service_discovery_external(dev, apdev)
2864 finally:
2865 dev[0].request("P2P_SERV_DISC_EXTERNAL 0")
2866
2867def _test_dbus_p2p_service_discovery_external(dev, apdev):
910eb5b5 2868 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2869 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2870
2871 addr0 = dev[0].p2p_dev_addr()
2872 addr1 = dev[1].p2p_dev_addr()
2873 resp = "0300000101"
2874
2875 dev[1].request("P2P_FLUSH")
2876 dev[1].request("P2P_SERV_DISC_REQ " + addr0 + " 02000001")
2877 dev[1].p2p_find(social=True)
2878
2879 class TestDbusP2p(TestDbus):
2880 def __init__(self, bus):
2881 TestDbus.__init__(self, bus)
2882 self.sd = False
2883
2884 def __enter__(self):
2885 gobject.timeout_add(1, self.run_test)
2886 gobject.timeout_add(15000, self.timeout)
2887 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
2888 "DeviceFound")
2889 self.add_signal(self.serviceDiscoveryRequest,
2890 WPAS_DBUS_IFACE_P2PDEVICE,
2891 "ServiceDiscoveryRequest")
2892 self.loop.run()
2893 return self
2894
2895 def deviceFound(self, path):
2896 logger.debug("deviceFound: path=%s" % path)
2897
2898 def serviceDiscoveryRequest(self, sd_request):
2899 logger.debug("serviceDiscoveryRequest: sd_request=%s" % str(sd_request))
2900 self.sd = True
2901 args = { 'peer_object': sd_request['peer_object'],
2902 'frequency': sd_request['frequency'],
2903 'dialog_token': sd_request['dialog_token'],
2904 'tlvs': dbus.ByteArray(binascii.unhexlify(resp)) }
2905 p2p.ServiceDiscoveryResponse(dbus.Dictionary(args, signature='sv'))
2906 self.loop.quit()
2907
2908 def run_test(self, *args):
2909 logger.debug("run_test")
2910 p2p.ServiceDiscoveryExternal(1)
2911 p2p.ServiceUpdate()
2912 p2p.Listen(15)
2913 return False
2914
2915 def success(self):
2916 return self.sd
2917
2918 with TestDbusP2p(bus) as t:
2919 if not t.success():
2920 raise Exception("Expected signals not seen")
2921
2922 ev = dev[1].wait_global_event(["P2P-SERV-DISC-RESP"], timeout=5)
2923 if ev is None:
2924 raise Exception("Service discovery timed out")
2925 if addr0 not in ev:
2926 raise Exception("Unexpected address in SD Response: " + ev)
2927 if ev.split(' ')[4] != resp:
2928 raise Exception("Unexpected response data SD Response: " + ev)
2929 dev[1].p2p_stop_find()
2930
2931 p2p.StopFind()
2932 p2p.ServiceDiscoveryExternal(0)
2933
2934def test_dbus_p2p_autogo(dev, apdev):
2935 """D-Bus P2P autonomous GO"""
910eb5b5 2936 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2937 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2938 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
2939
2940 addr0 = dev[0].p2p_dev_addr()
2941
2942 class TestDbusP2p(TestDbus):
2943 def __init__(self, bus):
2944 TestDbus.__init__(self, bus)
2945 self.first = True
2946 self.waiting_end = False
2947 self.done = False
2948
2949 def __enter__(self):
2950 gobject.timeout_add(1, self.run_test)
2951 gobject.timeout_add(15000, self.timeout)
2952 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
2953 "DeviceFound")
2954 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
2955 "GroupStarted")
2956 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
2957 "GroupFinished")
2958 self.add_signal(self.persistentGroupAdded,
2959 WPAS_DBUS_IFACE_P2PDEVICE,
2960 "PersistentGroupAdded")
2961 self.add_signal(self.persistentGroupRemoved,
2962 WPAS_DBUS_IFACE_P2PDEVICE,
2963 "PersistentGroupRemoved")
2964 self.add_signal(self.provisionDiscoveryRequestDisplayPin,
2965 WPAS_DBUS_IFACE_P2PDEVICE,
2966 "ProvisionDiscoveryRequestDisplayPin")
2967 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
2968 "StaAuthorized")
2969 self.loop.run()
2970 return self
2971
2972 def groupStarted(self, properties):
2973 logger.debug("groupStarted: " + str(properties))
2974 self.group = properties['group_object']
2975 role = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Role",
2976 dbus_interface=dbus.PROPERTIES_IFACE)
2977 if role != "GO":
2978 raise Exception("Unexpected role reported: " + role)
2979 group = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Group",
2980 dbus_interface=dbus.PROPERTIES_IFACE)
2981 if group != properties['group_object']:
2982 raise Exception("Unexpected Group reported: " + str(group))
2983 go = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PeerGO",
2984 dbus_interface=dbus.PROPERTIES_IFACE)
2985 if go != '/':
2986 raise Exception("Unexpected PeerGO value: " + str(go))
2987 if self.first:
2988 self.first = False
2989 logger.info("Remove persistent group instance")
2990 p2p.Disconnect()
2991 else:
2992 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
2993 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 join")
2994
2995 def groupFinished(self, properties):
2996 logger.debug("groupFinished: " + str(properties))
2997 if self.waiting_end:
2998 logger.info("Remove persistent group")
2999 p2p.RemovePersistentGroup(self.persistent)
3000 else:
3001 logger.info("Re-start persistent group")
3002 params = dbus.Dictionary({'persistent_group_object':
3003 self.persistent,
3004 'frequency': 2412})
3005 p2p.GroupAdd(params)
3006
3007 def persistentGroupAdded(self, path, properties):
3008 logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
3009 self.persistent = path
3010
3011 def persistentGroupRemoved(self, path):
3012 logger.debug("persistentGroupRemoved: %s" % path)
3013 self.done = True
3014 self.loop.quit()
3015
3016 def deviceFound(self, path):
3017 logger.debug("deviceFound: path=%s" % path)
3018 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3019 self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3020 dbus_interface=dbus.PROPERTIES_IFACE,
3021 byte_arrays=True)
3022 logger.debug('peer properties: ' + str(self.peer))
3023
3024 def provisionDiscoveryRequestDisplayPin(self, peer_object, pin):
3025 logger.debug("provisionDiscoveryRequestDisplayPin - peer=%s pin=%s" % (peer_object, pin))
3026 self.peer_path = peer_object
3027 peer = binascii.unhexlify(peer_object.split('/')[-1])
3028 addr = ""
3029 for p in peer:
3030 if len(addr) > 0:
3031 addr += ':'
3032 addr += '%02x' % ord(p)
795b6f57
JM
3033
3034 params = { 'Role': 'registrar',
3035 'P2PDeviceAddress': self.peer['DeviceAddress'],
3036 'Bssid': self.peer['DeviceAddress'],
3037 'Type': 'pin' }
3038 try:
3039 wps.Start(params)
3040 raise Exception("Invalid WPS.Start() accepted")
3041 except dbus.exceptions.DBusException, e:
3042 if "InvalidArgs" not in str(e):
3043 raise Exception("Unexpected error message: " + str(e))
2ec82e67
JM
3044 params = { 'Role': 'registrar',
3045 'P2PDeviceAddress': self.peer['DeviceAddress'],
3046 'Bssid': self.peer['DeviceAddress'],
3047 'Type': 'pin',
3048 'Pin': '12345670' }
3049 logger.info("Authorize peer to connect to the group")
3050 wps.Start(params)
3051
3052 def staAuthorized(self, name):
3053 logger.debug("staAuthorized: " + name)
3054 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, self.peer_path)
3055 res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3056 dbus_interface=dbus.PROPERTIES_IFACE,
3057 byte_arrays=True)
3058 if 'Groups' not in res or len(res['Groups']) != 1:
3059 raise Exception("Unexpected number of peer Groups entries")
3060 if res['Groups'][0] != self.group:
3061 raise Exception("Unexpected peer Groups[0] value")
3062
3063 g_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group)
3064 res = g_obj.GetAll(WPAS_DBUS_GROUP,
3065 dbus_interface=dbus.PROPERTIES_IFACE,
3066 byte_arrays=True)
3067 logger.debug("Group properties: " + str(res))
3068 if 'Members' not in res or len(res['Members']) != 1:
3069 raise Exception("Unexpected number of group members")
3070
3071 ext = dbus.ByteArray("\x11\x22\x33\x44")
3072 # Earlier implementation of this interface was a bit strange. The
3073 # property is defined to have aay signature and that is what the
3074 # getter returned. However, the setter expected there to be a
3075 # dictionary with 'WPSVendorExtensions' as the key surrounding these
3076 # values.. The current implementations maintains support for that
3077 # for backwards compability reasons. Verify that encoding first.
3078 vals = dbus.Dictionary({ 'WPSVendorExtensions': [ ext ]},
3079 signature='sv')
3080 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3081 dbus_interface=dbus.PROPERTIES_IFACE)
3082 res = g_obj.Get(WPAS_DBUS_GROUP, 'WPSVendorExtensions',
3083 dbus_interface=dbus.PROPERTIES_IFACE,
3084 byte_arrays=True)
3085 if len(res) != 1:
3086 raise Exception("Unexpected number of vendor extensions")
3087 if res[0] != ext:
3088 raise Exception("Vendor extension value changed")
3089
3090 # And now verify that the more appropriate encoding is accepted as
3091 # well.
3092 res.append(dbus.ByteArray('\xaa\xbb\xcc\xdd\xee\xff'))
3093 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
3094 dbus_interface=dbus.PROPERTIES_IFACE)
3095 res2 = g_obj.Get(WPAS_DBUS_GROUP, 'WPSVendorExtensions',
3096 dbus_interface=dbus.PROPERTIES_IFACE,
3097 byte_arrays=True)
3098 if len(res) != 2:
3099 raise Exception("Unexpected number of vendor extensions")
3100 if res[0] != res2[0] or res[1] != res2[1]:
3101 raise Exception("Vendor extension value changed")
3102
3103 for i in range(10):
3104 res.append(dbus.ByteArray('\xaa\xbb'))
3105 try:
3106 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
3107 dbus_interface=dbus.PROPERTIES_IFACE)
3108 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3109 except dbus.exceptions.DBusException, e:
3110 if "Error.Failed" not in str(e):
3111 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3112
3113 vals = dbus.Dictionary({ 'Foo': [ ext ]}, signature='sv')
3114 try:
3115 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3116 dbus_interface=dbus.PROPERTIES_IFACE)
3117 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3118 except dbus.exceptions.DBusException, e:
3119 if "InvalidArgs" not in str(e):
3120 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3121
3122 vals = [ "foo" ]
3123 try:
3124 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3125 dbus_interface=dbus.PROPERTIES_IFACE)
3126 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3127 except dbus.exceptions.DBusException, e:
3128 if "Error.Failed" not in str(e):
3129 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3130
3131 vals = [ [ "foo" ] ]
3132 try:
3133 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3134 dbus_interface=dbus.PROPERTIES_IFACE)
3135 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3136 except dbus.exceptions.DBusException, e:
3137 if "Error.Failed" not in str(e):
3138 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3139
3140 self.waiting_end = True
3141 p2p.Disconnect()
3142
3143 def run_test(self, *args):
3144 logger.debug("run_test")
3145 params = dbus.Dictionary({'persistent': True,
3146 'frequency': 2412})
3147 logger.info("Add a persistent group")
3148 p2p.GroupAdd(params)
3149 return False
3150
3151 def success(self):
3152 return self.done
3153
3154 with TestDbusP2p(bus) as t:
3155 if not t.success():
3156 raise Exception("Expected signals not seen")
3157
3158 dev[1].wait_go_ending_session()
3159
3160def test_dbus_p2p_autogo_pbc(dev, apdev):
3161 """D-Bus P2P autonomous GO and PBC"""
910eb5b5 3162 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
3163 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3164 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
3165
3166 addr0 = dev[0].p2p_dev_addr()
3167
3168 class TestDbusP2p(TestDbus):
3169 def __init__(self, bus):
3170 TestDbus.__init__(self, bus)
3171 self.first = True
3172 self.waiting_end = False
3173 self.done = False
3174
3175 def __enter__(self):
3176 gobject.timeout_add(1, self.run_test)
3177 gobject.timeout_add(15000, self.timeout)
3178 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3179 "DeviceFound")
3180 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3181 "GroupStarted")
3182 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3183 "GroupFinished")
3184 self.add_signal(self.provisionDiscoveryPBCRequest,
3185 WPAS_DBUS_IFACE_P2PDEVICE,
3186 "ProvisionDiscoveryPBCRequest")
3187 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
3188 "StaAuthorized")
3189 self.loop.run()
3190 return self
3191
3192 def groupStarted(self, properties):
3193 logger.debug("groupStarted: " + str(properties))
3194 self.group = properties['group_object']
3195 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3196 dev1.global_request("P2P_CONNECT " + addr0 + " pbc join")
3197
3198 def groupFinished(self, properties):
3199 logger.debug("groupFinished: " + str(properties))
3200 self.done = True
3201 self.loop.quit()
3202
3203 def deviceFound(self, path):
3204 logger.debug("deviceFound: path=%s" % path)
3205 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3206 self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3207 dbus_interface=dbus.PROPERTIES_IFACE,
3208 byte_arrays=True)
3209 logger.debug('peer properties: ' + str(self.peer))
3210
3211 def provisionDiscoveryPBCRequest(self, peer_object):
3212 logger.debug("provisionDiscoveryPBCRequest - peer=%s" % peer_object)
3213 self.peer_path = peer_object
3214 peer = binascii.unhexlify(peer_object.split('/')[-1])
3215 addr = ""
3216 for p in peer:
3217 if len(addr) > 0:
3218 addr += ':'
3219 addr += '%02x' % ord(p)
3220 params = { 'Role': 'registrar',
3221 'P2PDeviceAddress': self.peer['DeviceAddress'],
3222 'Bssid': self.peer['DeviceAddress'],
3223 'Type': 'pbc' }
3224 logger.info("Authorize peer to connect to the group")
3225 wps.Start(params)
3226
3227 def staAuthorized(self, name):
3228 logger.debug("staAuthorized: " + name)
3229 p2p.Disconnect()
3230
3231 def run_test(self, *args):
3232 logger.debug("run_test")
3233 params = dbus.Dictionary({'frequency': 2412})
3234 p2p.GroupAdd(params)
3235 return False
3236
3237 def success(self):
3238 return self.done
3239
3240 with TestDbusP2p(bus) as t:
3241 if not t.success():
3242 raise Exception("Expected signals not seen")
3243
3244 dev[1].wait_go_ending_session()
3245 dev[1].flush_scan_cache()
3246
3247def test_dbus_p2p_autogo_legacy(dev, apdev):
3248 """D-Bus P2P autonomous GO and legacy STA"""
910eb5b5 3249 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
3250 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3251 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
3252
3253 addr0 = dev[0].p2p_dev_addr()
3254
3255 class TestDbusP2p(TestDbus):
3256 def __init__(self, bus):
3257 TestDbus.__init__(self, bus)
3258 self.done = False
3259
3260 def __enter__(self):
3261 gobject.timeout_add(1, self.run_test)
3262 gobject.timeout_add(15000, self.timeout)
3263 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3264 "GroupStarted")
3265 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3266 "GroupFinished")
3267 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
3268 "StaAuthorized")
3269 self.loop.run()
3270 return self
3271
3272 def groupStarted(self, properties):
3273 logger.debug("groupStarted: " + str(properties))
3274 pin = '12345670'
3275 params = { 'Role': 'enrollee',
3276 'Type': 'pin',
3277 'Pin': pin }
3278 wps.Start(params)
3279 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3280 dev1.scan_for_bss(addr0, freq=2412)
3281 dev1.request("WPS_PIN " + addr0 + " " + pin)
3282
3283 def groupFinished(self, properties):
3284 logger.debug("groupFinished: " + str(properties))
3285 self.done = True
3286 self.loop.quit()
3287
3288 def staAuthorized(self, name):
3289 logger.debug("staAuthorized: " + name)
3290 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3291 dev1.request("DISCONNECT")
3292 p2p.Disconnect()
3293
3294 def run_test(self, *args):
3295 logger.debug("run_test")
3296 params = dbus.Dictionary({'frequency': 2412})
3297 p2p.GroupAdd(params)
3298 return False
3299
3300 def success(self):
3301 return self.done
3302
3303 with TestDbusP2p(bus) as t:
3304 if not t.success():
3305 raise Exception("Expected signals not seen")
3306
3307def test_dbus_p2p_join(dev, apdev):
3308 """D-Bus P2P join an autonomous GO"""
910eb5b5 3309 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
3310 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3311
3312 addr1 = dev[1].p2p_dev_addr()
3313 addr2 = dev[2].p2p_dev_addr()
3314 dev[1].p2p_start_go(freq=2412)
3315 dev[2].p2p_listen()
3316
3317 class TestDbusP2p(TestDbus):
3318 def __init__(self, bus):
3319 TestDbus.__init__(self, bus)
3320 self.done = False
3321 self.peer = None
3322 self.go = None
3323
3324 def __enter__(self):
3325 gobject.timeout_add(1, self.run_test)
3326 gobject.timeout_add(15000, self.timeout)
3327 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3328 "DeviceFound")
3329 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3330 "GroupStarted")
3331 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3332 "GroupFinished")
3333 self.add_signal(self.invitationResult, WPAS_DBUS_IFACE_P2PDEVICE,
3334 "InvitationResult")
3335 self.loop.run()
3336 return self
3337
3338 def deviceFound(self, path):
3339 logger.debug("deviceFound: path=%s" % path)
3340 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3341 res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3342 dbus_interface=dbus.PROPERTIES_IFACE,
3343 byte_arrays=True)
3344 logger.debug('peer properties: ' + str(res))
3345 if addr2.replace(':','') in path:
3346 self.peer = path
3347 elif addr1.replace(':','') in path:
3348 self.go = path
3349 if self.peer and self.go:
3350 logger.info("Join the group")
3351 p2p.StopFind()
3352 args = { 'peer': self.go,
3353 'join': True,
3354 'wps_method': 'pin',
3355 'frequency': 2412 }
3356 pin = p2p.Connect(args)
3357
3358 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3359 dev1.request("WPS_PIN any " + pin)
3360
3361 def groupStarted(self, properties):
3362 logger.debug("groupStarted: " + str(properties))
3363 role = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Role",
3364 dbus_interface=dbus.PROPERTIES_IFACE)
3365 if role != "client":
3366 raise Exception("Unexpected role reported: " + role)
3367 group = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Group",
3368 dbus_interface=dbus.PROPERTIES_IFACE)
3369 if group != properties['group_object']:
3370 raise Exception("Unexpected Group reported: " + str(group))
3371 go = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PeerGO",
3372 dbus_interface=dbus.PROPERTIES_IFACE)
3373 if go != self.go:
3374 raise Exception("Unexpected PeerGO value: " + str(go))
3375
3376 g_obj = bus.get_object(WPAS_DBUS_SERVICE,
3377 properties['group_object'])
3378 res = g_obj.GetAll(WPAS_DBUS_GROUP,
3379 dbus_interface=dbus.PROPERTIES_IFACE,
3380 byte_arrays=True)
3381 logger.debug("Group properties: " + str(res))
3382
3383 ext = dbus.ByteArray("\x11\x22\x33\x44")
3384 try:
3385 # Set(WPSVendorExtensions) not allowed for P2P Client
3386 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
3387 dbus_interface=dbus.PROPERTIES_IFACE)
3388 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3389 except dbus.exceptions.DBusException, e:
3390 if "Error.Failed: Failed to set property" not in str(e):
3391 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3392
3393 args = { 'duration1': 30000, 'interval1': 102400,
3394 'duration2': 20000, 'interval2': 102400 }
3395 p2p.PresenceRequest(args)
3396
3397 args = { 'peer': self.peer }
3398 p2p.Invite(args)
3399
3400 def groupFinished(self, properties):
3401 logger.debug("groupFinished: " + str(properties))
3402 self.done = True
3403 self.loop.quit()
3404
3405 def invitationResult(self, result):
3406 logger.debug("invitationResult: " + str(result))
3407 if result['status'] != 1:
3408 raise Exception("Unexpected invitation result: " + str(result))
3409 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3410 dev1.remove_group()
3411
3412 def run_test(self, *args):
3413 logger.debug("run_test")
3414 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
3415 return False
3416
3417 def success(self):
3418 return self.done
3419
3420 with TestDbusP2p(bus) as t:
3421 if not t.success():
3422 raise Exception("Expected signals not seen")
3423
3424 dev[2].p2p_stop_find()
3425
3426def test_dbus_p2p_config(dev, apdev):
3427 """D-Bus Get/Set P2PDeviceConfig"""
3428 try:
3429 _test_dbus_p2p_config(dev, apdev)
3430 finally:
3431 dev[0].request("P2P_SET ssid_postfix ")
3432
3433def _test_dbus_p2p_config(dev, apdev):
910eb5b5 3434 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
3435 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3436
3437 res = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3438 dbus_interface=dbus.PROPERTIES_IFACE,
3439 byte_arrays=True)
3440 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig", res,
3441 dbus_interface=dbus.PROPERTIES_IFACE)
3442 res2 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3443 dbus_interface=dbus.PROPERTIES_IFACE,
3444 byte_arrays=True)
3445
3446 if len(res) != len(res2):
3447 raise Exception("Different number of parameters")
3448 for k in res:
3449 if res[k] != res2[k]:
3450 raise Exception("Parameter %s value changes" % k)
3451
3452 changes = { 'SsidPostfix': 'foo',
3453 'VendorExtension': [ dbus.ByteArray('\x11\x22\x33\x44') ],
3454 'SecondaryDeviceTypes': [ dbus.ByteArray('\x11\x22\x33\x44\x55\x66\x77\x88') ]}
3455 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3456 dbus.Dictionary(changes, signature='sv'),
3457 dbus_interface=dbus.PROPERTIES_IFACE)
3458
3459 res2 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3460 dbus_interface=dbus.PROPERTIES_IFACE,
3461 byte_arrays=True)
3462 logger.debug("P2PDeviceConfig: " + str(res2))
3463 if 'VendorExtension' not in res2 or len(res2['VendorExtension']) != 1:
3464 raise Exception("VendorExtension does not match")
3465 if 'SecondaryDeviceTypes' not in res2 or len(res2['SecondaryDeviceTypes']) != 1:
3466 raise Exception("SecondaryDeviceType does not match")
3467
3468 changes = { 'SsidPostfix': '',
3469 'VendorExtension': dbus.Array([], signature="ay"),
3470 'SecondaryDeviceTypes': dbus.Array([], signature="ay") }
3471 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3472 dbus.Dictionary(changes, signature='sv'),
3473 dbus_interface=dbus.PROPERTIES_IFACE)
3474
3475 res3 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3476 dbus_interface=dbus.PROPERTIES_IFACE,
3477 byte_arrays=True)
3478 logger.debug("P2PDeviceConfig: " + str(res3))
3479 if 'VendorExtension' in res3:
3480 raise Exception("VendorExtension not removed")
3481 if 'SecondaryDeviceTypes' in res3:
3482 raise Exception("SecondaryDeviceType not removed")
3483
3484 try:
3485 dev[0].request("P2P_SET disabled 1")
3486 if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3487 dbus_interface=dbus.PROPERTIES_IFACE,
3488 byte_arrays=True)
3489 raise Exception("Invalid Get(P2PDeviceConfig) accepted")
3490 except dbus.exceptions.DBusException, e:
3491 if "Error.Failed: P2P is not available for this interface" not in str(e):
3492 raise Exception("Unexpected error message for invalid Invite: " + str(e))
3493 finally:
3494 dev[0].request("P2P_SET disabled 0")
3495
3496 try:
3497 dev[0].request("P2P_SET disabled 1")
3498 changes = { 'SsidPostfix': 'foo' }
3499 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3500 dbus.Dictionary(changes, signature='sv'),
3501 dbus_interface=dbus.PROPERTIES_IFACE)
3502 raise Exception("Invalid Set(P2PDeviceConfig) accepted")
3503 except dbus.exceptions.DBusException, e:
3504 if "Error.Failed: P2P is not available for this interface" not in str(e):
3505 raise Exception("Unexpected error message for invalid Invite: " + str(e))
3506 finally:
3507 dev[0].request("P2P_SET disabled 0")
3508
3509 tests = [ { 'DeviceName': 123 },
3510 { 'SsidPostfix': 123 },
3511 { 'Foo': 'Bar' } ]
3512 for changes in tests:
3513 try:
3514 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
3515 dbus.Dictionary(changes, signature='sv'),
3516 dbus_interface=dbus.PROPERTIES_IFACE)
3517 raise Exception("Invalid Set(P2PDeviceConfig) accepted")
3518 except dbus.exceptions.DBusException, e:
3519 if "InvalidArgs" not in str(e):
3520 raise Exception("Unexpected error message for invalid Invite: " + str(e))
3521
3522def test_dbus_p2p_persistent(dev, apdev):
3523 """D-Bus P2P persistent group"""
910eb5b5 3524 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
3525 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3526
3527 class TestDbusP2p(TestDbus):
3528 def __init__(self, bus):
3529 TestDbus.__init__(self, bus)
3530
3531 def __enter__(self):
3532 gobject.timeout_add(1, self.run_test)
3533 gobject.timeout_add(15000, self.timeout)
3534 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3535 "GroupStarted")
3536 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3537 "GroupFinished")
3538 self.add_signal(self.persistentGroupAdded,
3539 WPAS_DBUS_IFACE_P2PDEVICE,
3540 "PersistentGroupAdded")
3541 self.loop.run()
3542 return self
3543
3544 def groupStarted(self, properties):
3545 logger.debug("groupStarted: " + str(properties))
3546 p2p.Disconnect()
3547
3548 def groupFinished(self, properties):
3549 logger.debug("groupFinished: " + str(properties))
3550 self.loop.quit()
3551
3552 def persistentGroupAdded(self, path, properties):
3553 logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
3554 self.persistent = path
3555
3556 def run_test(self, *args):
3557 logger.debug("run_test")
3558 params = dbus.Dictionary({'persistent': True,
3559 'frequency': 2412})
3560 logger.info("Add a persistent group")
3561 p2p.GroupAdd(params)
3562 return False
3563
3564 def success(self):
3565 return True
3566
3567 with TestDbusP2p(bus) as t:
3568 if not t.success():
3569 raise Exception("Expected signals not seen")
3570 persistent = t.persistent
3571
3572 p_obj = bus.get_object(WPAS_DBUS_SERVICE, persistent)
3573 res = p_obj.Get(WPAS_DBUS_PERSISTENT_GROUP, "Properties",
3574 dbus_interface=dbus.PROPERTIES_IFACE, byte_arrays=True)
3575 logger.info("Persistent group Properties: " + str(res))
3576 vals = dbus.Dictionary({ 'ssid': 'DIRECT-foo' }, signature='sv')
3577 p_obj.Set(WPAS_DBUS_PERSISTENT_GROUP, "Properties", vals,
3578 dbus_interface=dbus.PROPERTIES_IFACE)
3579 res2 = p_obj.Get(WPAS_DBUS_PERSISTENT_GROUP, "Properties",
3580 dbus_interface=dbus.PROPERTIES_IFACE)
3581 if len(res) != len(res2):
3582 raise Exception("Different number of parameters")
3583 for k in res:
3584 if k != 'ssid' and res[k] != res2[k]:
3585 raise Exception("Parameter %s value changes" % k)
3586 if res2['ssid'] != '"DIRECT-foo"':
3587 raise Exception("Unexpected ssid")
3588
3589 args = dbus.Dictionary({ 'ssid': 'DIRECT-testing',
3590 'psk': '1234567890' }, signature='sv')
3591 group = p2p.AddPersistentGroup(args)
3592
3593 groups = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PersistentGroups",
3594 dbus_interface=dbus.PROPERTIES_IFACE)
3595 if len(groups) != 2:
3596 raise Exception("Unexpected number of persistent groups: " + str(groups))
3597
3598 p2p.RemoveAllPersistentGroups()
3599
3600 groups = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PersistentGroups",
3601 dbus_interface=dbus.PROPERTIES_IFACE)
3602 if len(groups) != 0:
3603 raise Exception("Unexpected number of persistent groups: " + str(groups))
3604
3605 try:
3606 p2p.RemovePersistentGroup(persistent)
3607 raise Exception("Invalid RemovePersistentGroup accepted")
3608 except dbus.exceptions.DBusException, e:
3609 if "NetworkUnknown: There is no such persistent group" not in str(e):
3610 raise Exception("Unexpected error message for invalid RemovePersistentGroup: " + str(e))
3611
3612def test_dbus_p2p_reinvoke_persistent(dev, apdev):
3613 """D-Bus P2P reinvoke persistent group"""
910eb5b5 3614 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
3615 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3616 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
3617
3618 addr0 = dev[0].p2p_dev_addr()
3619
3620 class TestDbusP2p(TestDbus):
3621 def __init__(self, bus):
3622 TestDbus.__init__(self, bus)
3623 self.first = True
3624 self.waiting_end = False
3625 self.done = False
3626 self.invited = False
3627
3628 def __enter__(self):
3629 gobject.timeout_add(1, self.run_test)
3630 gobject.timeout_add(15000, self.timeout)
3631 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3632 "DeviceFound")
3633 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3634 "GroupStarted")
3635 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3636 "GroupFinished")
3637 self.add_signal(self.persistentGroupAdded,
3638 WPAS_DBUS_IFACE_P2PDEVICE,
3639 "PersistentGroupAdded")
3640 self.add_signal(self.provisionDiscoveryRequestDisplayPin,
3641 WPAS_DBUS_IFACE_P2PDEVICE,
3642 "ProvisionDiscoveryRequestDisplayPin")
3643 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
3644 "StaAuthorized")
3645 self.loop.run()
3646 return self
3647
3648 def groupStarted(self, properties):
3649 logger.debug("groupStarted: " + str(properties))
3650 if not self.invited:
3651 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3652 dev1.scan_for_bss(addr0, freq=2412)
3653 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 join")
3654
3655 def groupFinished(self, properties):
3656 logger.debug("groupFinished: " + str(properties))
3657 if self.invited:
3658 self.done = True
3659 self.loop.quit()
3660 else:
3661 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3662 dev1.request("SET persistent_reconnect 1")
3663 dev1.p2p_listen()
3664
3665 args = { 'persistent_group_object': dbus.ObjectPath(path),
3666 'peer': self.peer_path }
3667 try:
3668 pin = p2p.Invite(args)
3669 raise Exception("Invalid Invite accepted")
3670 except dbus.exceptions.DBusException, e:
3671 if "InvalidArgs" not in str(e):
3672 raise Exception("Unexpected error message for invalid Invite: " + str(e))
3673
3674 args = { 'persistent_group_object': self.persistent,
3675 'peer': self.peer_path }
3676 pin = p2p.Invite(args)
3677 self.invited = True
3678
3679 def persistentGroupAdded(self, path, properties):
3680 logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
3681 self.persistent = path
3682
3683 def deviceFound(self, path):
3684 logger.debug("deviceFound: path=%s" % path)
3685 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3686 self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3687 dbus_interface=dbus.PROPERTIES_IFACE,
3688 byte_arrays=True)
3689
3690 def provisionDiscoveryRequestDisplayPin(self, peer_object, pin):
3691 logger.debug("provisionDiscoveryRequestDisplayPin - peer=%s pin=%s" % (peer_object, pin))
3692 self.peer_path = peer_object
3693 peer = binascii.unhexlify(peer_object.split('/')[-1])
3694 addr = ""
3695 for p in peer:
3696 if len(addr) > 0:
3697 addr += ':'
3698 addr += '%02x' % ord(p)
3699 params = { 'Role': 'registrar',
3700 'P2PDeviceAddress': self.peer['DeviceAddress'],
3701 'Bssid': self.peer['DeviceAddress'],
3702 'Type': 'pin',
3703 'Pin': '12345670' }
3704 logger.info("Authorize peer to connect to the group")
3705 wps.Start(params)
3706
3707 def staAuthorized(self, name):
3708 logger.debug("staAuthorized: " + name)
3709 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3710 dev1.remove_group()
3711 ev = dev1.wait_event(["P2P-GROUP-REMOVED"], timeout=10)
3712 if ev is None:
3713 raise Exception("Group removal timed out")
3714 p2p.Disconnect()
3715
3716 def run_test(self, *args):
3717 logger.debug("run_test")
3718 params = dbus.Dictionary({'persistent': True,
3719 'frequency': 2412})
3720 logger.info("Add a persistent group")
3721 p2p.GroupAdd(params)
3722 return False
3723
3724 def success(self):
3725 return self.done
3726
3727 with TestDbusP2p(bus) as t:
3728 if not t.success():
3729 raise Exception("Expected signals not seen")
3730
3731def test_dbus_p2p_go_neg_rx(dev, apdev):
3732 """D-Bus P2P GO Negotiation receive"""
910eb5b5 3733 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
3734 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3735 addr0 = dev[0].p2p_dev_addr()
3736
3737 class TestDbusP2p(TestDbus):
3738 def __init__(self, bus):
3739 TestDbus.__init__(self, bus)
3740 self.done = False
3741
3742 def __enter__(self):
3743 gobject.timeout_add(1, self.run_test)
3744 gobject.timeout_add(15000, self.timeout)
3745 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3746 "DeviceFound")
3747 self.add_signal(self.goNegotiationRequest,
3748 WPAS_DBUS_IFACE_P2PDEVICE,
3749 "GONegotiationRequest",
3750 byte_arrays=True)
3751 self.add_signal(self.goNegotiationSuccess,
3752 WPAS_DBUS_IFACE_P2PDEVICE,
3753 "GONegotiationSuccess",
3754 byte_arrays=True)
3755 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3756 "GroupStarted")
3757 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3758 "GroupFinished")
3759 self.loop.run()
3760 return self
3761
3762 def deviceFound(self, path):
3763 logger.debug("deviceFound: path=%s" % path)
3764
3765 def goNegotiationRequest(self, path, dev_passwd_id):
3766 logger.debug("goNegotiationRequest: path=%s dev_passwd_id=%d" % (path, dev_passwd_id))
3767 if dev_passwd_id != 1:
3768 raise Exception("Unexpected dev_passwd_id=%d" % dev_passwd_id)
3769 args = { 'peer': path, 'wps_method': 'display', 'pin': '12345670',
3770 'go_intent': 15, 'persistent': False, 'frequency': 5175 }
3771 try:
3772 p2p.Connect(args)
3773 raise Exception("Invalid Connect accepted")
3774 except dbus.exceptions.DBusException, e:
3775 if "ConnectChannelUnsupported" not in str(e):
3776 raise Exception("Unexpected error message for invalid Connect: " + str(e))
3777
3778 args = { 'peer': path, 'wps_method': 'display', 'pin': '12345670',
3779 'go_intent': 15, 'persistent': False }
3780 p2p.Connect(args)
3781
3782 def goNegotiationSuccess(self, properties):
3783 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
3784
3785 def groupStarted(self, properties):
3786 logger.debug("groupStarted: " + str(properties))
3787 p2p.Disconnect()
3788
3789 def groupFinished(self, properties):
3790 logger.debug("groupFinished: " + str(properties))
3791 self.done = True
3792 self.loop.quit()
3793
3794 def run_test(self, *args):
3795 logger.debug("run_test")
3796 p2p.Listen(10)
3797 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3798 if not dev1.discover_peer(addr0):
3799 raise Exception("Peer not found")
3800 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 enter")
3801 return False
3802
3803 def success(self):
3804 return self.done
3805
3806 with TestDbusP2p(bus) as t:
3807 if not t.success():
3808 raise Exception("Expected signals not seen")
3809
3810def test_dbus_p2p_go_neg_auth(dev, apdev):
3811 """D-Bus P2P GO Negotiation authorized"""
910eb5b5 3812 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
3813 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3814 addr0 = dev[0].p2p_dev_addr()
3815 dev[1].p2p_listen()
3816
3817 class TestDbusP2p(TestDbus):
3818 def __init__(self, bus):
3819 TestDbus.__init__(self, bus)
3820 self.done = False
3821 self.peer_joined = False
3822 self.peer_disconnected = False
3823
3824 def __enter__(self):
3825 gobject.timeout_add(1, self.run_test)
3826 gobject.timeout_add(15000, self.timeout)
3827 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3828 "DeviceFound")
3829 self.add_signal(self.goNegotiationSuccess,
3830 WPAS_DBUS_IFACE_P2PDEVICE,
3831 "GONegotiationSuccess",
3832 byte_arrays=True)
3833 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3834 "GroupStarted")
3835 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3836 "GroupFinished")
3837 self.add_signal(self.staDeauthorized, WPAS_DBUS_IFACE,
3838 "StaDeauthorized")
3839 self.add_signal(self.peerJoined, WPAS_DBUS_GROUP,
3840 "PeerJoined")
3841 self.add_signal(self.peerDisconnected, WPAS_DBUS_GROUP,
3842 "PeerDisconnected")
3843 self.loop.run()
3844 return self
3845
3846 def deviceFound(self, path):
3847 logger.debug("deviceFound: path=%s" % path)
3848 args = { 'peer': path, 'wps_method': 'keypad',
3849 'go_intent': 15, 'authorize_only': True }
3850 try:
3851 p2p.Connect(args)
3852 raise Exception("Invalid Connect accepted")
3853 except dbus.exceptions.DBusException, e:
3854 if "InvalidArgs" not in str(e):
3855 raise Exception("Unexpected error message for invalid Connect: " + str(e))
3856
3857 args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
3858 'go_intent': 15, 'authorize_only': True }
3859 p2p.Connect(args)
3860 p2p.Listen(10)
3861 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3862 if not dev1.discover_peer(addr0):
3863 raise Exception("Peer not found")
3864 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=0")
3865 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15);
3866 if ev is None:
3867 raise Exception("Group formation timed out")
3868
3869 def goNegotiationSuccess(self, properties):
3870 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
3871
3872 def groupStarted(self, properties):
3873 logger.debug("groupStarted: " + str(properties))
3874 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3875 dev1.remove_group()
3876
3877 def staDeauthorized(self, name):
3878 logger.debug("staDeuthorized: " + name)
3879 p2p.Disconnect()
3880
3881 def peerJoined(self, peer):
3882 logger.debug("peerJoined: " + peer)
3883 self.peer_joined = True
3884
3885 def peerDisconnected(self, peer):
3886 logger.debug("peerDisconnected: " + peer)
3887 self.peer_disconnected = True
3888
3889 def groupFinished(self, properties):
3890 logger.debug("groupFinished: " + str(properties))
3891 self.done = True
3892 self.loop.quit()
3893
3894 def run_test(self, *args):
3895 logger.debug("run_test")
3896 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
3897 return False
3898
3899 def success(self):
3900 return self.done and self.peer_joined and self.peer_disconnected
3901
3902 with TestDbusP2p(bus) as t:
3903 if not t.success():
3904 raise Exception("Expected signals not seen")
3905
3906def test_dbus_p2p_go_neg_init(dev, apdev):
3907 """D-Bus P2P GO Negotiation initiation"""
910eb5b5 3908 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
3909 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3910 addr0 = dev[0].p2p_dev_addr()
3911 dev[1].p2p_listen()
3912
3913 class TestDbusP2p(TestDbus):
3914 def __init__(self, bus):
3915 TestDbus.__init__(self, bus)
3916 self.done = False
3917
3918 def __enter__(self):
3919 gobject.timeout_add(1, self.run_test)
3920 gobject.timeout_add(15000, self.timeout)
3921 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3922 "DeviceFound")
3923 self.add_signal(self.goNegotiationSuccess,
3924 WPAS_DBUS_IFACE_P2PDEVICE,
3925 "GONegotiationSuccess",
3926 byte_arrays=True)
3927 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3928 "GroupStarted")
3929 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3930 "GroupFinished")
3931 self.loop.run()
3932 return self
3933
3934 def deviceFound(self, path):
3935 logger.debug("deviceFound: path=%s" % path)
3936 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3937 args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
3938 'go_intent': 0 }
3939 p2p.Connect(args)
3940
3941 ev = dev1.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15)
3942 if ev is None:
3943 raise Exception("Timeout while waiting for GO Neg Request")
3944 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15")
3945 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15);
3946 if ev is None:
3947 raise Exception("Group formation timed out")
3948
3949 def goNegotiationSuccess(self, properties):
3950 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
3951
3952 def groupStarted(self, properties):
3953 logger.debug("groupStarted: " + str(properties))
3954 p2p.Disconnect()
3955 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3956 dev1.remove_group()
3957
3958 def groupFinished(self, properties):
3959 logger.debug("groupFinished: " + str(properties))
3960 self.done = True
3961 self.loop.quit()
3962
3963 def run_test(self, *args):
3964 logger.debug("run_test")
3965 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
3966 return False
3967
3968 def success(self):
3969 return self.done
3970
3971 with TestDbusP2p(bus) as t:
3972 if not t.success():
3973 raise Exception("Expected signals not seen")
3974
3975def test_dbus_p2p_wps_failure(dev, apdev):
3976 """D-Bus P2P WPS failure"""
910eb5b5 3977 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
3978 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3979 addr0 = dev[0].p2p_dev_addr()
3980
3981 class TestDbusP2p(TestDbus):
3982 def __init__(self, bus):
3983 TestDbus.__init__(self, bus)
3984 self.done = False
3985
3986 def __enter__(self):
3987 gobject.timeout_add(1, self.run_test)
3988 gobject.timeout_add(15000, self.timeout)
3989 self.add_signal(self.goNegotiationRequest,
3990 WPAS_DBUS_IFACE_P2PDEVICE,
3991 "GONegotiationRequest",
3992 byte_arrays=True)
3993 self.add_signal(self.goNegotiationSuccess,
3994 WPAS_DBUS_IFACE_P2PDEVICE,
3995 "GONegotiationSuccess",
3996 byte_arrays=True)
3997 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3998 "GroupStarted")
3999 self.add_signal(self.wpsFailed, WPAS_DBUS_IFACE_P2PDEVICE,
4000 "WpsFailed")
4001 self.loop.run()
4002 return self
4003
4004 def goNegotiationRequest(self, path, dev_passwd_id):
4005 logger.debug("goNegotiationRequest: path=%s dev_passwd_id=%d" % (path, dev_passwd_id))
4006 if dev_passwd_id != 1:
4007 raise Exception("Unexpected dev_passwd_id=%d" % dev_passwd_id)
4008 args = { 'peer': path, 'wps_method': 'display', 'pin': '12345670',
4009 'go_intent': 15 }
4010 p2p.Connect(args)
4011
4012 def goNegotiationSuccess(self, properties):
4013 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4014
4015 def groupStarted(self, properties):
4016 logger.debug("groupStarted: " + str(properties))
4017 raise Exception("Unexpected GroupStarted")
4018
4019 def wpsFailed(self, name, args):
4020 logger.debug("wpsFailed - name=%s args=%s" % (name, str(args)))
4021 self.done = True
4022 self.loop.quit()
4023
4024 def run_test(self, *args):
4025 logger.debug("run_test")
4026 p2p.Listen(10)
4027 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4028 if not dev1.discover_peer(addr0):
4029 raise Exception("Peer not found")
4030 dev1.global_request("P2P_CONNECT " + addr0 + " 87654321 enter")
4031 return False
4032
4033 def success(self):
4034 return self.done
4035
4036 with TestDbusP2p(bus) as t:
4037 if not t.success():
4038 raise Exception("Expected signals not seen")
4039
4040def test_dbus_p2p_two_groups(dev, apdev):
4041 """D-Bus P2P with two concurrent groups"""
910eb5b5 4042 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
4043 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4044
4045 dev[0].request("SET p2p_no_group_iface 0")
4046 addr0 = dev[0].p2p_dev_addr()
4047 addr1 = dev[1].p2p_dev_addr()
4048 addr2 = dev[2].p2p_dev_addr()
4049 dev[1].p2p_start_go(freq=2412)
4050
4051 class TestDbusP2p(TestDbus):
4052 def __init__(self, bus):
4053 TestDbus.__init__(self, bus)
4054 self.done = False
4055 self.peer = None
4056 self.go = None
4057 self.group1 = None
4058 self.group2 = None
4059
4060 def __enter__(self):
4061 gobject.timeout_add(1, self.run_test)
4062 gobject.timeout_add(15000, self.timeout)
4063 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4064 "PropertiesChanged", byte_arrays=True)
4065 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4066 "DeviceFound")
4067 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4068 "GroupStarted")
4069 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4070 "GroupFinished")
4071 self.add_signal(self.peerJoined, WPAS_DBUS_GROUP,
4072 "PeerJoined")
4073 self.loop.run()
4074 return self
4075
4076 def propertiesChanged(self, interface_name, changed_properties,
4077 invalidated_properties):
4078 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
4079
4080 def deviceFound(self, path):
4081 logger.debug("deviceFound: path=%s" % path)
4082 if addr2.replace(':','') in path:
4083 self.peer = path
4084 elif addr1.replace(':','') in path:
4085 self.go = path
4086 if self.go and not self.group1:
4087 logger.info("Join the group")
4088 p2p.StopFind()
4089 pin = '12345670'
4090 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4091 dev1.request("WPS_PIN any " + pin)
4092 args = { 'peer': self.go,
4093 'join': True,
4094 'wps_method': 'pin',
4095 'pin': pin,
4096 'frequency': 2412 }
4097 p2p.Connect(args)
4098
4099 def groupStarted(self, properties):
4100 logger.debug("groupStarted: " + str(properties))
4101 prop = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE,
4102 dbus_interface=dbus.PROPERTIES_IFACE)
4103 logger.debug("p2pdevice properties: " + str(prop))
4104
4105 g_obj = bus.get_object(WPAS_DBUS_SERVICE,
4106 properties['group_object'])
4107 res = g_obj.GetAll(WPAS_DBUS_GROUP,
4108 dbus_interface=dbus.PROPERTIES_IFACE,
4109 byte_arrays=True)
4110 logger.debug("Group properties: " + str(res))
4111
4112 if not self.group1:
4113 self.group1 = properties['group_object']
4114 self.group1iface = properties['interface_object']
4115 self.g1_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4116 self.group1iface)
4117
4118 logger.info("Start autonomous GO")
4119 params = dbus.Dictionary({ 'frequency': 2412 })
4120 p2p.GroupAdd(params)
4121 elif not self.group2:
4122 self.group2 = properties['group_object']
4123 self.group2iface = properties['interface_object']
4124 self.g2_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4125 self.group2iface)
4126 self.g2_bssid = res['BSSID']
4127
4128 if self.group1 and self.group2:
4129 logger.info("Authorize peer to join the group")
4130 a2 = binascii.unhexlify(addr2.replace(':',''))
4131 params = { 'Role': 'enrollee',
4132 'P2PDeviceAddress': dbus.ByteArray(a2),
4133 'Bssid': dbus.ByteArray(a2),
4134 'Type': 'pin',
4135 'Pin': '12345670' }
4136 g_wps = dbus.Interface(self.g2_if_obj, WPAS_DBUS_IFACE_WPS)
4137 g_wps.Start(params)
4138
4139 bssid = ':'.join([binascii.hexlify(l) for l in self.g2_bssid])
4140 dev2 = WpaSupplicant('wlan2', '/tmp/wpas-wlan2')
4141 dev2.scan_for_bss(bssid, freq=2412)
4142 dev2.global_request("P2P_CONNECT " + bssid + " 12345670 join freq=2412")
4143
4144 def groupFinished(self, properties):
4145 logger.debug("groupFinished: " + str(properties))
4146
4147 if self.group1 == properties['group_object']:
4148 self.group1 = None
4149 elif self.group2 == properties['group_object']:
4150 self.group2 = None
4151
4152 if not self.group1 and not self.group2:
4153 self.done = True
4154 self.loop.quit()
4155
4156 def peerJoined(self, peer):
4157 logger.debug("peerJoined: " + peer)
4158 self.check_results()
4159
4160 dev2 = WpaSupplicant('wlan2', '/tmp/wpas-wlan2')
4161 dev2.remove_group()
4162
4163 logger.info("Disconnect group2")
4164 group_p2p = dbus.Interface(self.g2_if_obj,
4165 WPAS_DBUS_IFACE_P2PDEVICE)
4166 group_p2p.Disconnect()
4167
4168 logger.info("Disconnect group1")
4169 group_p2p = dbus.Interface(self.g1_if_obj,
4170 WPAS_DBUS_IFACE_P2PDEVICE)
4171 group_p2p.Disconnect()
4172
4173 def check_results(self):
4174 logger.info("Check results with two concurrent groups in operation")
4175
4176 g1_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group1)
4177 res1 = g1_obj.GetAll(WPAS_DBUS_GROUP,
4178 dbus_interface=dbus.PROPERTIES_IFACE,
4179 byte_arrays=True)
4180
4181 g2_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group2)
4182 res2 = g2_obj.GetAll(WPAS_DBUS_GROUP,
4183 dbus_interface=dbus.PROPERTIES_IFACE,
4184 byte_arrays=True)
4185
4186 logger.info("group1 = " + self.group1)
4187 logger.debug("Group properties: " + str(res1))
4188
4189 logger.info("group2 = " + self.group2)
4190 logger.debug("Group properties: " + str(res2))
4191
4192 prop = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE,
4193 dbus_interface=dbus.PROPERTIES_IFACE)
4194 logger.debug("p2pdevice properties: " + str(prop))
4195
4196 if res1['Role'] != 'client':
4197 raise Exception("Group1 role reported incorrectly: " + res1['Role'])
4198 if res2['Role'] != 'GO':
4199 raise Exception("Group2 role reported incorrectly: " + res2['Role'])
4200 if prop['Role'] != 'device':
4201 raise Exception("p2pdevice role reported incorrectly: " + prop['Role'])
4202
4203 if len(res2['Members']) != 1:
4204 raise Exception("Unexpected Members value for group 2")
4205
4206 def run_test(self, *args):
4207 logger.debug("run_test")
4208 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4209 return False
4210
4211 def success(self):
4212 return self.done
4213
4214 with TestDbusP2p(bus) as t:
4215 if not t.success():
4216 raise Exception("Expected signals not seen")
4217
4218 dev[1].remove_group()
0e126c6d
JM
4219
4220def test_dbus_introspect(dev, apdev):
4221 """D-Bus introspection"""
4222 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4223
4224 res = if_obj.Introspect(WPAS_DBUS_IFACE,
4225 dbus_interface=dbus.INTROSPECTABLE_IFACE)
4226 logger.info("Initial Introspect: " + str(res))
4227 if res is None or "Introspectable" not in res or "GroupStarted" not in res:
4228 raise Exception("Unexpected initial Introspect response: " + str(res))
4229
4230 with alloc_fail(dev[0], 1, "wpa_dbus_introspect"):
4231 res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
4232 dbus_interface=dbus.INTROSPECTABLE_IFACE)
4233 logger.info("Introspect: " + str(res2))
4234 if res2 is not None:
4235 raise Exception("Unexpected Introspect response")
4236
4237 with alloc_fail(dev[0], 1, "=add_interface;wpa_dbus_introspect"):
4238 res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
4239 dbus_interface=dbus.INTROSPECTABLE_IFACE)
4240 logger.info("Introspect: " + str(res2))
4241 if res2 is None:
4242 raise Exception("No Introspect response")
4243 if len(res2) >= len(res):
4244 raise Exception("Unexpected Introspect response")
4245
4246 with alloc_fail(dev[0], 1, "wpabuf_alloc;add_interface;wpa_dbus_introspect"):
4247 res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
4248 dbus_interface=dbus.INTROSPECTABLE_IFACE)
4249 logger.info("Introspect: " + str(res2))
4250 if res2 is None:
4251 raise Exception("No Introspect response")
4252 if len(res2) >= len(res):
4253 raise Exception("Unexpected Introspect response")
4254
4255 with alloc_fail(dev[0], 2, "=add_interface;wpa_dbus_introspect"):
4256 res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
4257 dbus_interface=dbus.INTROSPECTABLE_IFACE)
4258 logger.info("Introspect: " + str(res2))
4259 if res2 is None:
4260 raise Exception("No Introspect response")
4261 if len(res2) >= len(res):
4262 raise Exception("Unexpected Introspect response")
9bbce257
JM
4263
4264def test_dbus_ap(dev, apdev):
4265 """D-Bus AddNetwork for AP mode"""
4266 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4267 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
4268
4269 ssid = "test-wpa2-psk"
4270 passphrase = 'qwertyuiop'
4271
4272 class TestDbusConnect(TestDbus):
4273 def __init__(self, bus):
4274 TestDbus.__init__(self, bus)
4275 self.started = False
4276
4277 def __enter__(self):
4278 gobject.timeout_add(1, self.run_connect)
4279 gobject.timeout_add(15000, self.timeout)
4280 self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
4281 self.add_signal(self.networkSelected, WPAS_DBUS_IFACE,
4282 "NetworkSelected")
4283 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
4284 "PropertiesChanged")
4285 self.loop.run()
4286 return self
4287
4288 def networkAdded(self, network, properties):
4289 logger.debug("networkAdded: %s" % str(network))
4290 logger.debug(str(properties))
4291
4292 def networkSelected(self, network):
4293 logger.debug("networkSelected: %s" % str(network))
4294 self.network_selected = True
4295
4296 def propertiesChanged(self, properties):
4297 logger.debug("propertiesChanged: %s" % str(properties))
4298 if 'State' in properties and properties['State'] == "completed":
4299 self.started = True
4300 self.loop.quit()
4301
4302 def run_connect(self, *args):
4303 logger.debug("run_connect")
4304 args = dbus.Dictionary({ 'ssid': ssid,
4305 'key_mgmt': 'WPA-PSK',
4306 'psk': passphrase,
4307 'mode': 2,
4308 'frequency': 2412 },
4309 signature='sv')
4310 self.netw = iface.AddNetwork(args)
4311 iface.SelectNetwork(self.netw)
4312 return False
4313
4314 def success(self):
4315 return self.started
4316
4317 with TestDbusConnect(bus) as t:
4318 if not t.success():
4319 raise Exception("Expected signals not seen")
4320 dev[1].connect(ssid, psk=passphrase, scan_freq="2412")