]> git.ipfire.org Git - thirdparty/hostap.git/blame - tests/hwsim/test_dbus.py
Add support for testing memory allocation failures
[thirdparty/hostap.git] / tests / hwsim / test_dbus.py
CommitLineData
2ec82e67
JM
1# wpa_supplicant D-Bus interface tests
2# Copyright (c) 2014, Jouni Malinen <j@w1.fi>
3#
4# This software may be distributed under the terms of the BSD license.
5# See README for more details.
6
7import binascii
8import gobject
9import logging
10logger = logging.getLogger()
11import subprocess
12import time
13
14import hostapd
15from wpasupplicant import WpaSupplicant
16from test_ap_tdls import connect_2sta_open
17
18WPAS_DBUS_SERVICE = "fi.w1.wpa_supplicant1"
19WPAS_DBUS_PATH = "/fi/w1/wpa_supplicant1"
20WPAS_DBUS_IFACE = "fi.w1.wpa_supplicant1.Interface"
21WPAS_DBUS_IFACE_WPS = WPAS_DBUS_IFACE + ".WPS"
22WPAS_DBUS_NETWORK = "fi.w1.wpa_supplicant1.Network"
23WPAS_DBUS_BSS = "fi.w1.wpa_supplicant1.BSS"
24WPAS_DBUS_IFACE_P2PDEVICE = WPAS_DBUS_IFACE + ".P2PDevice"
25WPAS_DBUS_P2P_PEER = "fi.w1.wpa_supplicant1.Peer"
26WPAS_DBUS_GROUP = "fi.w1.wpa_supplicant1.Group"
27WPAS_DBUS_PERSISTENT_GROUP = "fi.w1.wpa_supplicant1.PersistentGroup"
28
29def prepare_dbus(dev):
30 try:
31 import dbus
32 from dbus.mainloop.glib import DBusGMainLoop
33 dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
34 bus = dbus.SystemBus()
35 wpas_obj = bus.get_object(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH)
36 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
37 path = wpas.GetInterface(dev.ifname)
38 if_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
39 return (dbus,bus,wpas_obj,path,if_obj)
40 except Exception, e:
41 logger.info("No D-Bus support available: " + str(e))
42 raise Exception("hwsim-SKIP")
43
44class TestDbus(object):
45 def __init__(self, bus):
46 self.loop = gobject.MainLoop()
47 self.signals = []
48 self.bus = bus
49
50 def __exit__(self, type, value, traceback):
51 for s in self.signals:
52 s.remove()
53
54 def add_signal(self, handler, interface, name, byte_arrays=False):
55 s = self.bus.add_signal_receiver(handler, dbus_interface=interface,
56 signal_name=name,
57 byte_arrays=byte_arrays)
58 self.signals.append(s)
59
60 def timeout(self, *args):
61 logger.debug("timeout")
62 self.loop.quit()
63 return False
64
65def start_ap(ap):
66 ssid = "test-wps"
67 params = { "ssid": ssid, "eap_server": "1", "wps_state": "2",
68 "wpa_passphrase": "12345678", "wpa": "2",
69 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
70 "ap_pin": "12345670"}
71 return hostapd.add_ap(ap['ifname'], params)
72
73def test_dbus_getall(dev, apdev):
74 """D-Bus GetAll"""
75 (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
76
77 props = wpas_obj.GetAll(WPAS_DBUS_SERVICE,
78 dbus_interface=dbus.PROPERTIES_IFACE)
79 logger.debug("GetAll(fi.w1.wpa.supplicant1, /fi/w1/wpa_supplicant1) ==> " + str(props))
80
81 props = if_obj.GetAll(WPAS_DBUS_IFACE,
82 dbus_interface=dbus.PROPERTIES_IFACE)
83 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_IFACE, path, str(props)))
84
85 props = if_obj.GetAll(WPAS_DBUS_IFACE_WPS,
86 dbus_interface=dbus.PROPERTIES_IFACE)
87 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_IFACE_WPS, path, str(props)))
88
89 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
90 dbus_interface=dbus.PROPERTIES_IFACE)
91 if len(res) != 0:
92 raise Exception("Unexpected BSSs entry: " + str(res))
93
94 res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
95 dbus_interface=dbus.PROPERTIES_IFACE)
96 if len(res) != 0:
97 raise Exception("Unexpected Networks entry: " + str(res))
98
99 hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "open" })
100 bssid = apdev[0]['bssid']
101 dev[0].scan_for_bss(bssid, freq=2412)
102 id = dev[0].add_network()
103 dev[0].set_network(id, "disabled", "0")
104 dev[0].set_network_quoted(id, "ssid", "test")
105
106 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
107 dbus_interface=dbus.PROPERTIES_IFACE)
108 if len(res) != 1:
109 raise Exception("Missing BSSs entry: " + str(res))
110 bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
111 props = bss_obj.GetAll(WPAS_DBUS_BSS, dbus_interface=dbus.PROPERTIES_IFACE)
112 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_BSS, res[0], str(props)))
113 bssid_str = ''
114 for item in props['BSSID']:
115 if len(bssid_str) > 0:
116 bssid_str += ':'
117 bssid_str += '%02x' % item
118 if bssid_str != bssid:
119 raise Exception("Unexpected BSSID in BSSs entry")
120
121 res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
122 dbus_interface=dbus.PROPERTIES_IFACE)
123 if len(res) != 1:
124 raise Exception("Missing Networks entry: " + str(res))
125 net_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
126 props = net_obj.GetAll(WPAS_DBUS_NETWORK,
127 dbus_interface=dbus.PROPERTIES_IFACE)
128 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_NETWORK, res[0], str(props)))
129 ssid = props['Properties']['ssid']
130 if ssid != '"test"':
131 raise Exception("Unexpected SSID in network entry")
132
133def dbus_get(dbus, wpas_obj, prop, expect=None, byte_arrays=False):
134 val = wpas_obj.Get(WPAS_DBUS_SERVICE, prop,
135 dbus_interface=dbus.PROPERTIES_IFACE,
136 byte_arrays=byte_arrays)
137 if expect is not None and val != expect:
138 raise Exception("Unexpected %s: %s (expected: %s)" %
139 (prop, str(val), str(expect)))
140 return val
141
142def dbus_set(dbus, wpas_obj, prop, val):
143 wpas_obj.Set(WPAS_DBUS_SERVICE, prop, val,
144 dbus_interface=dbus.PROPERTIES_IFACE)
145
146def test_dbus_properties(dev, apdev):
147 """D-Bus Get/Set fi.w1.wpa_supplicant1 properties"""
148 (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
149
150 dbus_get(dbus, wpas_obj, "DebugLevel", expect="msgdump")
151 dbus_set(dbus, wpas_obj, "DebugLevel", "debug")
152 dbus_get(dbus, wpas_obj, "DebugLevel", expect="debug")
153 for (val,err) in [ (3, "Error.Failed: wrong property type"),
154 ("foo", "Error.Failed: wrong debug level value") ]:
155 try:
156 dbus_set(dbus, wpas_obj, "DebugLevel", val)
157 raise Exception("Invalid DebugLevel value accepted: " + str(val))
158 except dbus.exceptions.DBusException, e:
159 if err not in str(e):
160 raise Exception("Unexpected error message: " + str(e))
161 dbus_set(dbus, wpas_obj, "DebugLevel", "msgdump")
162 dbus_get(dbus, wpas_obj, "DebugLevel", expect="msgdump")
163
164 dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=True)
165 dbus_set(dbus, wpas_obj, "DebugTimestamp", False)
166 dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=False)
167 try:
168 dbus_set(dbus, wpas_obj, "DebugTimestamp", "foo")
169 raise Exception("Invalid DebugTimestamp value accepted")
170 except dbus.exceptions.DBusException, e:
171 if "Error.Failed: wrong property type" not in str(e):
172 raise Exception("Unexpected error message: " + str(e))
173 dbus_set(dbus, wpas_obj, "DebugTimestamp", True)
174 dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=True)
175
176 dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=True)
177 dbus_set(dbus, wpas_obj, "DebugShowKeys", False)
178 dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=False)
179 try:
180 dbus_set(dbus, wpas_obj, "DebugShowKeys", "foo")
181 raise Exception("Invalid DebugShowKeys value accepted")
182 except dbus.exceptions.DBusException, e:
183 if "Error.Failed: wrong property type" not in str(e):
184 raise Exception("Unexpected error message: " + str(e))
185 dbus_set(dbus, wpas_obj, "DebugShowKeys", True)
186 dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=True)
187
188 res = dbus_get(dbus, wpas_obj, "Interfaces")
189 if len(res) != 1:
190 raise Exception("Unexpected Interfaces value: " + str(res))
191
192 res = dbus_get(dbus, wpas_obj, "EapMethods")
193 if len(res) < 5 or "TTLS" not in res:
194 raise Exception("Unexpected EapMethods value: " + str(res))
195
196 res = dbus_get(dbus, wpas_obj, "Capabilities")
197 if len(res) < 2 or "p2p" not in res:
198 raise Exception("Unexpected Capabilities value: " + str(res))
199
200 dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True)
201 val = binascii.unhexlify("010006020304050608")
202 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(val))
203 res = dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True)
204 if val != res:
205 raise Exception("WFDIEs value changed")
206 try:
207 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray('\x00'))
208 raise Exception("Invalid WFDIEs value accepted")
209 except dbus.exceptions.DBusException, e:
210 if "InvalidArgs" not in str(e):
211 raise Exception("Unexpected error message: " + str(e))
212 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(''))
213 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(val))
214 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(''))
215 res = dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True)
216 if len(res) != 0:
217 raise Exception("WFDIEs not cleared properly")
218
219 res = dbus_get(dbus, wpas_obj, "EapMethods")
220 try:
221 dbus_set(dbus, wpas_obj, "EapMethods", res)
222 raise Exception("Invalid Set accepted")
223 except dbus.exceptions.DBusException, e:
224 if "InvalidArgs: Property is read-only" not in str(e):
225 raise Exception("Unexpected error message: " + str(e))
226
227 try:
228 wpas_obj.SetFoo(WPAS_DBUS_SERVICE, "DebugShowKeys", True,
229 dbus_interface=dbus.PROPERTIES_IFACE)
230 raise Exception("Unknown method accepted")
231 except dbus.exceptions.DBusException, e:
232 if "UnknownMethod" not in str(e):
233 raise Exception("Unexpected error message: " + str(e))
234
235 try:
236 wpas_obj.Get("foo", "DebugShowKeys",
237 dbus_interface=dbus.PROPERTIES_IFACE)
238 raise Exception("Invalid Get accepted")
239 except dbus.exceptions.DBusException, e:
240 if "InvalidArgs: No such property" not in str(e):
241 raise Exception("Unexpected error message: " + str(e))
242
243 test_obj = bus.get_object(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH,
244 introspect=False)
245 try:
246 test_obj.Get(123, "DebugShowKeys",
247 dbus_interface=dbus.PROPERTIES_IFACE)
248 raise Exception("Invalid Get accepted")
249 except dbus.exceptions.DBusException, e:
250 if "InvalidArgs: Invalid arguments" not in str(e):
251 raise Exception("Unexpected error message: " + str(e))
252 try:
253 test_obj.Get(WPAS_DBUS_SERVICE, 123,
254 dbus_interface=dbus.PROPERTIES_IFACE)
255 raise Exception("Invalid Get accepted")
256 except dbus.exceptions.DBusException, e:
257 if "InvalidArgs: Invalid arguments" not in str(e):
258 raise Exception("Unexpected error message: " + str(e))
259
260 try:
261 wpas_obj.Set(WPAS_DBUS_SERVICE, "WFDIEs",
262 dbus.ByteArray('', variant_level=2),
263 dbus_interface=dbus.PROPERTIES_IFACE)
264 raise Exception("Invalid Set accepted")
265 except dbus.exceptions.DBusException, e:
266 if "InvalidArgs: invalid message format" not in str(e):
267 raise Exception("Unexpected error message: " + str(e))
268
269def test_dbus_invalid_method(dev, apdev):
270 """D-Bus invalid method"""
271 (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
272 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
273
274 try:
275 wps.Foo()
276 raise Exception("Unknown method accepted")
277 except dbus.exceptions.DBusException, e:
278 if "UnknownMethod" not in str(e):
279 raise Exception("Unexpected error message: " + str(e))
280
281 test_obj = bus.get_object(WPAS_DBUS_SERVICE, path, introspect=False)
282 test_wps = dbus.Interface(test_obj, WPAS_DBUS_IFACE_WPS)
283 try:
284 test_wps.Start(123)
285 raise Exception("WPS.Start with incorrect signature accepted")
286 except dbus.exceptions.DBusException, e:
287 if "InvalidArgs: Invalid arg" not in str(e):
288 raise Exception("Unexpected error message: " + str(e))
289
290def test_dbus_get_set_wps(dev, apdev):
291 """D-Bus Get/Set for WPS properties"""
292 try:
293 return _test_dbus_get_set_wps(dev, apdev)
294 finally:
295 dev[0].request("SET wps_cred_processing 0")
296 dev[0].request("SET config_methods display keypad virtual_display nfc_interface")
297
298def _test_dbus_get_set_wps(dev, apdev):
299 (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
300
301 if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods",
302 dbus_interface=dbus.PROPERTIES_IFACE)
303
304 val = "display keypad virtual_display nfc_interface"
305 dev[0].request("SET config_methods " + val)
306
307 config = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods",
308 dbus_interface=dbus.PROPERTIES_IFACE)
309 if config != val:
310 raise Exception("Unexpected Get(ConfigMethods) result: " + config)
311
312 val2 = "push_button display"
313 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ConfigMethods", val2,
314 dbus_interface=dbus.PROPERTIES_IFACE)
315 config = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods",
316 dbus_interface=dbus.PROPERTIES_IFACE)
317 if config != val2:
318 raise Exception("Unexpected Get(ConfigMethods) result after Set: " + config)
319
320 dev[0].request("SET config_methods " + val)
321
322 for i in range(3):
323 dev[0].request("SET wps_cred_processing " + str(i))
324 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
325 dbus_interface=dbus.PROPERTIES_IFACE)
326 expected_val = False if i == 1 else True
327 if val != expected_val:
328 raise Exception("Unexpected Get(ProcessCredentials) result({}): {}".format(i, val))
329
330 class TestDbusGetSet(TestDbus):
331 def __init__(self, bus):
332 TestDbus.__init__(self, bus)
333 self.signal_received = False
334 self.signal_received_deprecated = False
335 self.sets_done = False
336
337 def __enter__(self):
338 gobject.timeout_add(1, self.run_sets)
339 gobject.timeout_add(1000, self.timeout)
340 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE_WPS,
341 "PropertiesChanged")
342 self.add_signal(self.propertiesChanged2, dbus.PROPERTIES_IFACE,
343 "PropertiesChanged")
344 self.loop.run()
345 return self
346
347 def propertiesChanged(self, properties):
348 logger.debug("PropertiesChanged: " + str(properties))
349 if properties.has_key("ProcessCredentials"):
350 self.signal_received_deprecated = True
351 if self.sets_done and self.signal_received:
352 self.loop.quit()
353
354 def propertiesChanged2(self, interface_name, changed_properties,
355 invalidated_properties):
356 logger.debug("propertiesChanged2: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
357 if interface_name != WPAS_DBUS_IFACE_WPS:
358 return
359 if changed_properties.has_key("ProcessCredentials"):
360 self.signal_received = True
361 if self.sets_done and self.signal_received_deprecated:
362 self.loop.quit()
363
364 def run_sets(self, *args):
365 logger.debug("run_sets")
366 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
367 dbus.Boolean(1),
368 dbus_interface=dbus.PROPERTIES_IFACE)
369 if if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
370 dbus_interface=dbus.PROPERTIES_IFACE) != True:
371 raise Exception("Unexpected Get(ProcessCredentials) result after Set");
372 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
373 dbus.Boolean(0),
374 dbus_interface=dbus.PROPERTIES_IFACE)
375 if if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
376 dbus_interface=dbus.PROPERTIES_IFACE) != False:
377 raise Exception("Unexpected Get(ProcessCredentials) result after Set");
378
379 self.dbus_sets_done = True
380 return False
381
382 def success(self):
383 return self.signal_received and self.signal_received_deprecated
384
385 with TestDbusGetSet(bus) as t:
386 if not t.success():
387 raise Exception("No signal received for ProcessCredentials change")
388
389def test_dbus_wps_invalid(dev, apdev):
390 """D-Bus invaldi WPS operation"""
391 (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
392 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
393
394 failures = [ {'Role': 'foo', 'Type': 'pbc'},
395 {'Role': 123, 'Type': 'pbc'},
396 {'Type': 'pbc'},
397 {'Role': 'enrollee'},
398 {'Role': 'registrar'},
399 {'Role': 'enrollee', 'Type': 123},
400 {'Role': 'enrollee', 'Type': 'foo'},
401 {'Role': 'enrollee', 'Type': 'pbc',
402 'Bssid': '02:33:44:55:66:77'},
403 {'Role': 'enrollee', 'Type': 'pin', 'Pin': 123},
404 {'Role': 'enrollee', 'Type': 'pbc',
405 'Bssid': dbus.ByteArray('12345')},
406 {'Role': 'enrollee', 'Type': 'pbc',
407 'P2PDeviceAddress': 12345},
408 {'Role': 'enrollee', 'Type': 'pbc',
409 'P2PDeviceAddress': dbus.ByteArray('12345')},
410 {'Role': 'enrollee', 'Type': 'pbc', 'Foo': 'bar'} ]
411 for args in failures:
412 try:
413 wps.Start(args)
414 raise Exception("Invalid WPS.Start() arguments accepted: " + str(args))
415 except dbus.exceptions.DBusException, e:
416 if not str(e).startswith("fi.w1.wpa_supplicant1.InvalidArgs"):
417 raise Exception("Unexpected error message: " + str(e))
418
419def test_dbus_wps_pbc(dev, apdev):
420 """D-Bus WPS/PBC operation and signals"""
421 try:
422 return _test_dbus_wps_pbc(dev, apdev)
423 finally:
424 dev[0].request("SET wps_cred_processing 0")
425
426def _test_dbus_wps_pbc(dev, apdev):
427 (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
428 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
429
430 hapd = start_ap(apdev[0])
431 hapd.request("WPS_PBC")
432 bssid = apdev[0]['bssid']
433 dev[0].scan_for_bss(bssid, freq="2412")
434 dev[0].request("SET wps_cred_processing 2")
435
436 class TestDbusWps(TestDbus):
437 def __init__(self, bus, wps):
438 TestDbus.__init__(self, bus)
439 self.success_seen = False
440 self.credentials_received = False
441 self.wps = wps
442
443 def __enter__(self):
444 gobject.timeout_add(1, self.start_pbc)
445 gobject.timeout_add(15000, self.timeout)
446 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
447 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
448 "Credentials")
449 self.loop.run()
450 return self
451
452 def wpsEvent(self, name, args):
453 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
454 if name == "success":
455 self.success_seen = True
456 if self.credentials_received:
457 self.loop.quit()
458
459 def credentials(self, args):
460 logger.debug("credentials: " + str(args))
461 self.credentials_received = True
462 if self.success_seen:
463 self.loop.quit()
464
465 def start_pbc(self, *args):
466 logger.debug("start_pbc")
467 self.wps.Start({'Role': 'enrollee', 'Type': 'pbc'})
468 return False
469
470 def success(self):
471 return self.success_seen and self.credentials_received
472
473 with TestDbusWps(bus, wps) as t:
474 if not t.success():
475 raise Exception("Failure in D-Bus operations")
476
477 dev[0].wait_connected(timeout=10)
478 dev[0].request("DISCONNECT")
479 hapd.disable()
480 dev[0].flush_scan_cache()
481
482def test_dbus_wps_pin(dev, apdev):
483 """D-Bus WPS/PIN operation and signals"""
484 try:
485 return _test_dbus_wps_pin(dev, apdev)
486 finally:
487 dev[0].request("SET wps_cred_processing 0")
488
489def _test_dbus_wps_pin(dev, apdev):
490 (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
491 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
492
493 hapd = start_ap(apdev[0])
494 hapd.request("WPS_PIN any 12345670")
495 bssid = apdev[0]['bssid']
496 dev[0].scan_for_bss(bssid, freq="2412")
497 dev[0].request("SET wps_cred_processing 2")
498
499 class TestDbusWps(TestDbus):
500 def __init__(self, bus):
501 TestDbus.__init__(self, bus)
502 self.success_seen = False
503 self.credentials_received = False
504
505 def __enter__(self):
506 gobject.timeout_add(1, self.start_pin)
507 gobject.timeout_add(15000, self.timeout)
508 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
509 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
510 "Credentials")
511 self.loop.run()
512 return self
513
514 def wpsEvent(self, name, args):
515 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
516 if name == "success":
517 self.success_seen = True
518 if self.credentials_received:
519 self.loop.quit()
520
521 def credentials(self, args):
522 logger.debug("credentials: " + str(args))
523 self.credentials_received = True
524 if self.success_seen:
525 self.loop.quit()
526
527 def start_pin(self, *args):
528 logger.debug("start_pin")
529 bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
530 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
531 'Bssid': bssid_ay})
532 return False
533
534 def success(self):
535 return self.success_seen and self.credentials_received
536
537 with TestDbusWps(bus) as t:
538 if not t.success():
539 raise Exception("Failure in D-Bus operations")
540
541 dev[0].wait_connected(timeout=10)
542
543def test_dbus_wps_pin2(dev, apdev):
544 """D-Bus WPS/PIN operation and signals (PIN from wpa_supplicant)"""
545 try:
546 return _test_dbus_wps_pin2(dev, apdev)
547 finally:
548 dev[0].request("SET wps_cred_processing 0")
549
550def _test_dbus_wps_pin2(dev, apdev):
551 (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
552 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
553
554 hapd = start_ap(apdev[0])
555 bssid = apdev[0]['bssid']
556 dev[0].scan_for_bss(bssid, freq="2412")
557 dev[0].request("SET wps_cred_processing 2")
558
559 class TestDbusWps(TestDbus):
560 def __init__(self, bus):
561 TestDbus.__init__(self, bus)
562 self.success_seen = False
563 self.failed = False
564
565 def __enter__(self):
566 gobject.timeout_add(1, self.start_pin)
567 gobject.timeout_add(15000, self.timeout)
568 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
569 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
570 "Credentials")
571 self.loop.run()
572 return self
573
574 def wpsEvent(self, name, args):
575 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
576 if name == "success":
577 self.success_seen = True
578 if self.credentials_received:
579 self.loop.quit()
580
581 def credentials(self, args):
582 logger.debug("credentials: " + str(args))
583 self.credentials_received = True
584 if self.success_seen:
585 self.loop.quit()
586
587 def start_pin(self, *args):
588 logger.debug("start_pin")
589 bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
590 res = wps.Start({'Role': 'enrollee', 'Type': 'pin',
591 'Bssid': bssid_ay})
592 pin = res['Pin']
593 h = hostapd.Hostapd(apdev[0]['ifname'])
594 h.request("WPS_PIN any " + pin)
595 return False
596
597 def success(self):
598 return self.success_seen and self.credentials_received
599
600 with TestDbusWps(bus) as t:
601 if not t.success():
602 raise Exception("Failure in D-Bus operations")
603
604 dev[0].wait_connected(timeout=10)
605
606def test_dbus_wps_pin_m2d(dev, apdev):
607 """D-Bus WPS/PIN operation and signals with M2D"""
608 try:
609 return _test_dbus_wps_pin_m2d(dev, apdev)
610 finally:
611 dev[0].request("SET wps_cred_processing 0")
612
613def _test_dbus_wps_pin_m2d(dev, apdev):
614 (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
615 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
616
617 hapd = start_ap(apdev[0])
618 bssid = apdev[0]['bssid']
619 dev[0].scan_for_bss(bssid, freq="2412")
620 dev[0].request("SET wps_cred_processing 2")
621
622 class TestDbusWps(TestDbus):
623 def __init__(self, bus):
624 TestDbus.__init__(self, bus)
625 self.success_seen = False
626 self.credentials_received = False
627
628 def __enter__(self):
629 gobject.timeout_add(1, self.start_pin)
630 gobject.timeout_add(15000, self.timeout)
631 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
632 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
633 "Credentials")
634 self.loop.run()
635 return self
636
637 def wpsEvent(self, name, args):
638 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
639 if name == "success":
640 self.success_seen = True
641 if self.credentials_received:
642 self.loop.quit()
643 elif name == "m2d":
644 h = hostapd.Hostapd(apdev[0]['ifname'])
645 h.request("WPS_PIN any 12345670")
646
647 def credentials(self, args):
648 logger.debug("credentials: " + str(args))
649 self.credentials_received = True
650 if self.success_seen:
651 self.loop.quit()
652
653 def start_pin(self, *args):
654 logger.debug("start_pin")
655 bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
656 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
657 'Bssid': bssid_ay})
658 return False
659
660 def success(self):
661 return self.success_seen and self.credentials_received
662
663 with TestDbusWps(bus) as t:
664 if not t.success():
665 raise Exception("Failure in D-Bus operations")
666
667 dev[0].wait_connected(timeout=10)
668
669def test_dbus_wps_reg(dev, apdev):
670 """D-Bus WPS/Registrar operation and signals"""
671 try:
672 return _test_dbus_wps_reg(dev, apdev)
673 finally:
674 dev[0].request("SET wps_cred_processing 0")
675
676def _test_dbus_wps_reg(dev, apdev):
677 (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
678 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
679
680 hapd = start_ap(apdev[0])
681 hapd.request("WPS_PIN any 12345670")
682 bssid = apdev[0]['bssid']
683 dev[0].scan_for_bss(bssid, freq="2412")
684 dev[0].request("SET wps_cred_processing 2")
685
686 class TestDbusWps(TestDbus):
687 def __init__(self, bus):
688 TestDbus.__init__(self, bus)
689 self.credentials_received = False
690
691 def __enter__(self):
692 gobject.timeout_add(100, self.start_reg)
693 gobject.timeout_add(15000, self.timeout)
694 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
695 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
696 "Credentials")
697 self.loop.run()
698 return self
699
700 def wpsEvent(self, name, args):
701 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
702
703 def credentials(self, args):
704 logger.debug("credentials: " + str(args))
705 self.credentials_received = True
706 self.loop.quit()
707
708 def start_reg(self, *args):
709 logger.debug("start_reg")
710 bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
711 wps.Start({'Role': 'registrar', 'Type': 'pin',
712 'Pin': '12345670', 'Bssid': bssid_ay})
713 return False
714
715 def success(self):
716 return self.credentials_received
717
718 with TestDbusWps(bus) as t:
719 if not t.success():
720 raise Exception("Failure in D-Bus operations")
721
722 dev[0].wait_connected(timeout=10)
723
724def test_dbus_scan_invalid(dev, apdev):
725 """D-Bus invalid scan method"""
726 (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
727 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
728
729 tests = [ ({}, "InvalidArgs"),
730 ({'Type': 123}, "InvalidArgs"),
731 ({'Type': 'foo'}, "InvalidArgs"),
732 ({'Type': 'active', 'Foo': 'bar'}, "InvalidArgs"),
733 ({'Type': 'active', 'SSIDs': 'foo'}, "InvalidArgs"),
734 ({'Type': 'active', 'SSIDs': ['foo']}, "InvalidArgs"),
735 ({'Type': 'active',
736 'SSIDs': [ dbus.ByteArray("1"), dbus.ByteArray("2"),
737 dbus.ByteArray("3"), dbus.ByteArray("4"),
738 dbus.ByteArray("5"), dbus.ByteArray("6"),
739 dbus.ByteArray("7"), dbus.ByteArray("8"),
740 dbus.ByteArray("9"), dbus.ByteArray("10"),
741 dbus.ByteArray("11"), dbus.ByteArray("12"),
742 dbus.ByteArray("13"), dbus.ByteArray("14"),
743 dbus.ByteArray("15"), dbus.ByteArray("16"),
744 dbus.ByteArray("17") ]},
745 "InvalidArgs"),
746 ({'Type': 'active',
747 'SSIDs': [ dbus.ByteArray("1234567890abcdef1234567890abcdef1") ]},
748 "InvalidArgs"),
749 ({'Type': 'active', 'IEs': 'foo'}, "InvalidArgs"),
750 ({'Type': 'active', 'IEs': ['foo']}, "InvalidArgs"),
751 ({'Type': 'active', 'Channels': 2412 }, "InvalidArgs"),
752 ({'Type': 'active', 'Channels': [ 2412 ] }, "InvalidArgs"),
753 ({'Type': 'active',
754 'Channels': [ (dbus.Int32(2412), dbus.UInt32(20)) ] },
755 "InvalidArgs"),
756 ({'Type': 'active',
757 'Channels': [ (dbus.UInt32(2412), dbus.Int32(20)) ] },
758 "InvalidArgs"),
759 ({'Type': 'active', 'AllowRoam': "yes" }, "InvalidArgs"),
760 ({'Type': 'passive', 'IEs': [ dbus.ByteArray("\xdd\x00") ]},
761 "InvalidArgs"),
762 ({'Type': 'passive', 'SSIDs': [ dbus.ByteArray("foo") ]},
763 "InvalidArgs")]
764 for (t,err) in tests:
765 try:
766 iface.Scan(t)
767 raise Exception("Invalid Scan() arguments accepted: " + str(t))
768 except dbus.exceptions.DBusException, e:
769 if err not in str(e):
770 raise Exception("Unexpected error message for invalid Scan(%s): %s" % (str(t), str(e)))
771
772def test_dbus_scan(dev, apdev):
773 """D-Bus scan and related signals"""
774 (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
775 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
776
777 hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "open" })
778
779 class TestDbusScan(TestDbus):
780 def __init__(self, bus):
781 TestDbus.__init__(self, bus)
782 self.scan_completed = 0
783 self.bss_added = False
784
785 def __enter__(self):
786 gobject.timeout_add(1, self.run_scan)
787 gobject.timeout_add(15000, self.timeout)
788 self.add_signal(self.scanDone, WPAS_DBUS_IFACE, "ScanDone")
789 self.add_signal(self.bssAdded, WPAS_DBUS_IFACE, "BSSAdded")
790 self.add_signal(self.bssRemoved, WPAS_DBUS_IFACE, "BSSRemoved")
791 self.loop.run()
792 return self
793
794 def scanDone(self, success):
795 logger.debug("scanDone: success=%s" % success)
796 self.scan_completed += 1
797 if self.scan_completed == 1:
798 iface.Scan({'Type': 'passive',
799 'AllowRoam': True,
800 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
801 elif self.scan_completed == 2:
802 iface.Scan({'Type': 'passive',
803 'AllowRoam': False})
804 elif self.bss_added and self.scan_completed == 3:
805 self.loop.quit()
806
807 def bssAdded(self, bss, properties):
808 logger.debug("bssAdded: %s" % bss)
809 logger.debug(str(properties))
810 self.bss_added = True
811 if self.scan_completed == 3:
812 self.loop.quit()
813
814 def bssRemoved(self, bss):
815 logger.debug("bssRemoved: %s" % bss)
816
817 def run_scan(self, *args):
818 logger.debug("run_scan")
819 iface.Scan({'Type': 'active',
820 'SSIDs': [ dbus.ByteArray("open"),
821 dbus.ByteArray() ],
822 'IEs': [ dbus.ByteArray("\xdd\x00"),
823 dbus.ByteArray() ],
824 'AllowRoam': False,
825 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
826 return False
827
828 def success(self):
829 return self.scan_completed == 3 and self.bss_added
830
831 with TestDbusScan(bus) as t:
832 if not t.success():
833 raise Exception("Expected signals not seen")
834
835 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSs",
836 dbus_interface=dbus.PROPERTIES_IFACE)
837 if len(res) < 1:
838 raise Exception("Scan result not in BSSs property")
839 iface.FlushBSS(0)
840 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSs",
841 dbus_interface=dbus.PROPERTIES_IFACE)
842 if len(res) != 0:
843 raise Exception("FlushBSS() did not remove scan results from BSSs property")
844 iface.FlushBSS(1)
845
846def test_dbus_scan_busy(dev, apdev):
847 """D-Bus scan trigger rejection when busy with previous scan"""
848 (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
849 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
850
851 if "OK" not in dev[0].request("SCAN freq=2412-2462"):
852 raise Exception("Failed to start scan")
853 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-STARTED"], 15)
854 if ev is None:
855 raise Exception("Scan start timed out")
856
857 try:
858 iface.Scan({'Type': 'active', 'AllowRoam': False})
859 raise Exception("Scan() accepted when busy")
860 except dbus.exceptions.DBusException, e:
861 if "ScanError: Scan request reject" not in str(e):
862 raise Exception("Unexpected error message: " + str(e))
863
864 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
865 if ev is None:
866 raise Exception("Scan timed out")
867
868def test_dbus_connect(dev, apdev):
869 """D-Bus AddNetwork and connect"""
870 (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
871 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
872
873 ssid = "test-wpa2-psk"
874 passphrase = 'qwertyuiop'
875 params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
876 hapd = hostapd.add_ap(apdev[0]['ifname'], params)
877
878 class TestDbusConnect(TestDbus):
879 def __init__(self, bus):
880 TestDbus.__init__(self, bus)
881 self.network_added = False
882 self.network_selected = False
883 self.network_removed = False
884 self.state = 0
885
886 def __enter__(self):
887 gobject.timeout_add(1, self.run_connect)
888 gobject.timeout_add(15000, self.timeout)
889 self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
890 self.add_signal(self.networkRemoved, WPAS_DBUS_IFACE,
891 "NetworkRemoved")
892 self.add_signal(self.networkSelected, WPAS_DBUS_IFACE,
893 "NetworkSelected")
894 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
895 "PropertiesChanged")
896 self.loop.run()
897 return self
898
899 def networkAdded(self, network, properties):
900 logger.debug("networkAdded: %s" % str(network))
901 logger.debug(str(properties))
902 self.network_added = True
903
904 def networkRemoved(self, network):
905 logger.debug("networkRemoved: %s" % str(network))
906 self.network_removed = True
907
908 def networkSelected(self, network):
909 logger.debug("networkSelected: %s" % str(network))
910 self.network_selected = True
911
912 def propertiesChanged(self, properties):
913 logger.debug("propertiesChanged: %s" % str(properties))
914 if 'State' in properties and properties['State'] == "completed":
915 if self.state == 0:
916 self.state = 1
917 iface.Disconnect()
918 elif self.state == 2:
919 self.state = 3
920 iface.Disconnect()
921 elif self.state == 4:
922 self.state = 5
923 iface.Reattach()
924 elif self.state == 5:
925 self.state = 6
926 res = iface.SignalPoll()
927 logger.debug("SignalPoll: " + str(res))
928 if 'frequency' not in res or res['frequency'] != 2412:
929 self.state = -1
930 logger.info("Unexpected SignalPoll result")
931 iface.RemoveNetwork(self.netw)
932 if 'State' in properties and properties['State'] == "disconnected":
933 if self.state == 1:
934 self.state = 2
935 iface.SelectNetwork(self.netw)
936 elif self.state == 3:
937 self.state = 4
938 iface.Reassociate()
939 elif self.state == 6:
940 self.state = 7
941 self.loop.quit()
942
943 def run_connect(self, *args):
944 logger.debug("run_connect")
945 args = dbus.Dictionary({ 'ssid': ssid,
946 'key_mgmt': 'WPA-PSK',
947 'psk': passphrase,
948 'scan_freq': 2412 },
949 signature='sv')
950 self.netw = iface.AddNetwork(args)
951 iface.SelectNetwork(self.netw)
952 return False
953
954 def success(self):
955 if not self.network_added or \
956 not self.network_removed or \
957 not self.network_selected:
958 return False
959 return self.state == 7
960
961 with TestDbusConnect(bus) as t:
962 if not t.success():
963 raise Exception("Expected signals not seen")
964
965
966def test_dbus_while_not_connected(dev, apdev):
967 """D-Bus invalid operations while not connected"""
968 (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
969 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
970
971 try:
972 iface.Disconnect()
973 raise Exception("Disconnect() accepted when not connected")
974 except dbus.exceptions.DBusException, e:
975 if "NotConnected" not in str(e):
976 raise Exception("Unexpected error message for invalid Disconnect: " + str(e))
977
978 try:
979 iface.Reattach()
980 raise Exception("Reattach() accepted when not connected")
981 except dbus.exceptions.DBusException, e:
982 if "NotConnected" not in str(e):
983 raise Exception("Unexpected error message for invalid Reattach: " + str(e))
984
985def test_dbus_connect_eap(dev, apdev):
986 """D-Bus AddNetwork and connect to EAP network"""
987 (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
988 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
989
990 ssid = "ieee8021x-open"
991 params = hostapd.radius_params()
992 params["ssid"] = ssid
993 params["ieee8021x"] = "1"
994 hapd = hostapd.add_ap(apdev[0]['ifname'], params)
995
996 class TestDbusConnect(TestDbus):
997 def __init__(self, bus):
998 TestDbus.__init__(self, bus)
999 self.certification_received = False
1000 self.eap_status = False
1001 self.state = 0
1002
1003 def __enter__(self):
1004 gobject.timeout_add(1, self.run_connect)
1005 gobject.timeout_add(15000, self.timeout)
1006 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1007 "PropertiesChanged")
1008 self.add_signal(self.certification, WPAS_DBUS_IFACE,
1009 "Certification")
1010 self.add_signal(self.networkRequest, WPAS_DBUS_IFACE,
1011 "NetworkRequest")
1012 self.add_signal(self.eap, WPAS_DBUS_IFACE, "EAP")
1013 self.loop.run()
1014 return self
1015
1016 def propertiesChanged(self, properties):
1017 logger.debug("propertiesChanged: %s" % str(properties))
1018 if 'State' in properties and properties['State'] == "completed":
1019 if self.state == 0:
1020 self.state = 1
1021 iface.EAPLogoff()
1022 elif self.state == 2:
1023 self.state = 3
1024 self.loop.quit()
1025 if 'State' in properties and properties['State'] == "disconnected":
1026 if self.state == 1:
1027 self.state = 2
1028 iface.EAPLogon()
1029 iface.SelectNetwork(self.netw)
1030
1031 def certification(self, args):
1032 logger.debug("certification: %s" % str(args))
1033 self.certification_received = True
1034
1035 def eap(self, status, parameter):
1036 logger.debug("EAP: status=%s parameter=%s" % (status, parameter))
1037 if status == 'completion' and parameter == 'success':
1038 self.eap_status = True
1039
1040 def networkRequest(self, path, field, txt):
1041 logger.debug("networkRequest: %s %s %s" % (path, field, txt))
1042 if field == "PASSWORD":
1043 iface.NetworkReply(path, field, "password")
1044
1045 def run_connect(self, *args):
1046 logger.debug("run_connect")
1047 args = dbus.Dictionary({ 'ssid': ssid,
1048 'key_mgmt': 'IEEE8021X',
1049 'eapol_flags': 0,
1050 'eap': 'TTLS',
1051 'anonymous_identity': 'ttls',
1052 'identity': 'pap user',
1053 'ca_cert': 'auth_serv/ca.pem',
1054 'phase2': 'auth=PAP',
1055 'scan_freq': 2412 },
1056 signature='sv')
1057 self.netw = iface.AddNetwork(args)
1058 iface.SelectNetwork(self.netw)
1059 return False
1060
1061 def success(self):
1062 if not self.eap_status or not self.certification_received:
1063 return False
1064 return self.state == 3
1065
1066 with TestDbusConnect(bus) as t:
1067 if not t.success():
1068 raise Exception("Expected signals not seen")
1069
1070def test_dbus_network(dev, apdev):
1071 """D-Bus AddNetwork/RemoveNetwork parameters and error cases"""
1072 (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1073 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1074
1075 args = dbus.Dictionary({ 'ssid': "foo",
1076 'key_mgmt': 'WPA-PSK',
1077 'psk': "12345678",
1078 'identity': dbus.ByteArray([ 1, 2 ]),
1079 'priority': dbus.Int32(0),
1080 'scan_freq': dbus.UInt32(2412) },
1081 signature='sv')
1082 netw = iface.AddNetwork(args)
1083 iface.RemoveNetwork(netw)
1084 try:
1085 iface.RemoveNetwork(netw)
1086 raise Exception("Invalid RemoveNetwork() accepted")
1087 except dbus.exceptions.DBusException, e:
1088 if "NetworkUnknown" not in str(e):
1089 raise Exception("Unexpected error message for invalid RemoveNetwork: " + str(e))
1090 try:
1091 iface.SelectNetwork(netw)
1092 raise Exception("Invalid SelectNetwork() accepted")
1093 except dbus.exceptions.DBusException, e:
1094 if "NetworkUnknown" not in str(e):
1095 raise Exception("Unexpected error message for invalid RemoveNetwork: " + str(e))
1096
1097 args = dbus.Dictionary({ 'ssid': "foo1", 'key_mgmt': 'NONE',
1098 'identity': "testuser", 'scan_freq': '2412' },
1099 signature='sv')
1100 netw1 = iface.AddNetwork(args)
1101 args = dbus.Dictionary({ 'ssid': "foo2", 'key_mgmt': 'NONE' },
1102 signature='sv')
1103 netw2 = iface.AddNetwork(args)
1104 res = if_obj.Get(WPAS_DBUS_IFACE, "Networks",
1105 dbus_interface=dbus.PROPERTIES_IFACE)
1106 if len(res) != 2:
1107 raise Exception("Unexpected number of networks")
1108
1109 net_obj = bus.get_object(WPAS_DBUS_SERVICE, netw1)
1110 res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled",
1111 dbus_interface=dbus.PROPERTIES_IFACE)
1112 if res != False:
1113 raise Exception("Added network was unexpectedly enabled by default")
1114 net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.Boolean(True),
1115 dbus_interface=dbus.PROPERTIES_IFACE)
1116 res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled",
1117 dbus_interface=dbus.PROPERTIES_IFACE)
1118 if res != True:
1119 raise Exception("Set(Enabled,True) did not seem to change property value")
1120 net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.Boolean(False),
1121 dbus_interface=dbus.PROPERTIES_IFACE)
1122 res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled",
1123 dbus_interface=dbus.PROPERTIES_IFACE)
1124 if res != False:
1125 raise Exception("Set(Enabled,False) did not seem to change property value")
1126 try:
1127 net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.UInt32(1),
1128 dbus_interface=dbus.PROPERTIES_IFACE)
1129 raise Exception("Invalid Set(Enabled,1) accepted")
1130 except dbus.exceptions.DBusException, e:
1131 if "Error.Failed: wrong property type" not in str(e):
1132 raise Exception("Unexpected error message for invalid Set(Enabled,1): " + str(e))
1133
1134 args = dbus.Dictionary({ 'ssid': "foo1new" }, signature='sv')
1135 net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args,
1136 dbus_interface=dbus.PROPERTIES_IFACE)
1137 res = net_obj.Get(WPAS_DBUS_NETWORK, "Properties",
1138 dbus_interface=dbus.PROPERTIES_IFACE)
1139 if res['ssid'] != '"foo1new"':
1140 raise Exception("Set(Properties) failed to update ssid")
1141 if res['identity'] != '"testuser"':
1142 raise Exception("Set(Properties) unexpectedly changed unrelated parameter")
1143
1144 iface.RemoveAllNetworks()
1145 res = if_obj.Get(WPAS_DBUS_IFACE, "Networks",
1146 dbus_interface=dbus.PROPERTIES_IFACE)
1147 if len(res) != 0:
1148 raise Exception("Unexpected number of networks")
1149 iface.RemoveAllNetworks()
1150
1151 tests = [ dbus.Dictionary({ 'psk': "1234567" }, signature='sv'),
1152 dbus.Dictionary({ 'identity': dbus.ByteArray() },
1153 signature='sv'),
1154 dbus.Dictionary({ 'identity': dbus.Byte(1) }, signature='sv'),
1155 dbus.Dictionary({ 'identity': "" }, signature='sv') ]
1156 for args in tests:
1157 try:
1158 iface.AddNetwork(args)
1159 raise Exception("Invalid AddNetwork args accepted: " + str(args))
1160 except dbus.exceptions.DBusException, e:
1161 if "InvalidArgs" not in str(e):
1162 raise Exception("Unexpected error message for invalid AddNetwork: " + str(e))
1163
1164def test_dbus_interface(dev, apdev):
1165 """D-Bus CreateInterface/GetInterface/RemoveInterface parameters and error cases"""
1166 (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1167 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
1168
1169 params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none' },
1170 signature='sv')
1171 path = wpas.CreateInterface(params)
1172 logger.debug("New interface path: " + str(path))
1173 path2 = wpas.GetInterface("lo")
1174 if path != path2:
1175 raise Exception("Interface object mismatch")
1176
1177 params = dbus.Dictionary({ 'Ifname': 'lo',
1178 'Driver': 'none',
1179 'ConfigFile': 'foo',
1180 'BridgeIfname': 'foo', },
1181 signature='sv')
1182 try:
1183 wpas.CreateInterface(params)
1184 raise Exception("Invalid CreateInterface() accepted")
1185 except dbus.exceptions.DBusException, e:
1186 if "InterfaceExists" not in str(e):
1187 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e))
1188
1189 wpas.RemoveInterface(path)
1190 try:
1191 wpas.RemoveInterface(path)
1192 raise Exception("Invalid RemoveInterface() accepted")
1193 except dbus.exceptions.DBusException, e:
1194 if "InterfaceUnknown" not in str(e):
1195 raise Exception("Unexpected error message for invalid RemoveInterface: " + str(e))
1196
1197 params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none',
1198 'Foo': 123 },
1199 signature='sv')
1200 try:
1201 wpas.CreateInterface(params)
1202 raise Exception("Invalid CreateInterface() accepted")
1203 except dbus.exceptions.DBusException, e:
1204 if "InvalidArgs" not in str(e):
1205 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e))
1206
1207 params = dbus.Dictionary({ 'Driver': 'none' }, signature='sv')
1208 try:
1209 wpas.CreateInterface(params)
1210 raise Exception("Invalid CreateInterface() accepted")
1211 except dbus.exceptions.DBusException, e:
1212 if "InvalidArgs" not in str(e):
1213 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e))
1214
1215 try:
1216 wpas.GetInterface("lo")
1217 raise Exception("Invalid GetInterface() accepted")
1218 except dbus.exceptions.DBusException, e:
1219 if "InterfaceUnknown" not in str(e):
1220 raise Exception("Unexpected error message for invalid RemoveInterface: " + str(e))
1221
1222def test_dbus_blob(dev, apdev):
1223 """D-Bus AddNetwork/RemoveNetwork parameters and error cases"""
1224 (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1225 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1226
1227 blob = dbus.ByteArray("\x01\x02\x03")
1228 iface.AddBlob('blob1', blob)
1229 try:
1230 iface.AddBlob('blob1', dbus.ByteArray("\x01\x02\x04"))
1231 raise Exception("Invalid AddBlob() accepted")
1232 except dbus.exceptions.DBusException, e:
1233 if "BlobExists" not in str(e):
1234 raise Exception("Unexpected error message for invalid AddBlob: " + str(e))
1235 res = iface.GetBlob('blob1')
1236 if len(res) != len(blob):
1237 raise Exception("Unexpected blob data length")
1238 for i in range(len(res)):
1239 if res[i] != dbus.Byte(blob[i]):
1240 raise Exception("Unexpected blob data")
1241 res = if_obj.Get(WPAS_DBUS_IFACE, "Blobs",
1242 dbus_interface=dbus.PROPERTIES_IFACE)
1243 if 'blob1' not in res:
1244 raise Exception("Added blob missing from Blobs property")
1245 iface.RemoveBlob('blob1')
1246 try:
1247 iface.RemoveBlob('blob1')
1248 raise Exception("Invalid RemoveBlob() accepted")
1249 except dbus.exceptions.DBusException, e:
1250 if "BlobUnknown" not in str(e):
1251 raise Exception("Unexpected error message for invalid RemoveBlob: " + str(e))
1252 try:
1253 iface.GetBlob('blob1')
1254 raise Exception("Invalid GetBlob() accepted")
1255 except dbus.exceptions.DBusException, e:
1256 if "BlobUnknown" not in str(e):
1257 raise Exception("Unexpected error message for invalid GetBlob: " + str(e))
1258
1259 class TestDbusBlob(TestDbus):
1260 def __init__(self, bus):
1261 TestDbus.__init__(self, bus)
1262 self.blob_added = False
1263 self.blob_removed = False
1264
1265 def __enter__(self):
1266 gobject.timeout_add(1, self.run_blob)
1267 gobject.timeout_add(15000, self.timeout)
1268 self.add_signal(self.blobAdded, WPAS_DBUS_IFACE, "BlobAdded")
1269 self.add_signal(self.blobRemoved, WPAS_DBUS_IFACE, "BlobRemoved")
1270 self.loop.run()
1271 return self
1272
1273 def blobAdded(self, blobName):
1274 logger.debug("blobAdded: %s" % blobName)
1275 if blobName == 'blob2':
1276 self.blob_added = True
1277
1278 def blobRemoved(self, blobName):
1279 logger.debug("blobRemoved: %s" % blobName)
1280 if blobName == 'blob2':
1281 self.blob_removed = True
1282 self.loop.quit()
1283
1284 def run_blob(self, *args):
1285 logger.debug("run_blob")
1286 iface.AddBlob('blob2', dbus.ByteArray("\x01\x02\x04"))
1287 iface.RemoveBlob('blob2')
1288 return False
1289
1290 def success(self):
1291 return self.blob_added and self.blob_removed
1292
1293 with TestDbusBlob(bus) as t:
1294 if not t.success():
1295 raise Exception("Expected signals not seen")
1296
1297def test_dbus_autoscan(dev, apdev):
1298 """D-Bus Autoscan()"""
1299 (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1300 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1301
1302 iface.AutoScan("foo")
1303 iface.AutoScan("periodic:1")
1304 iface.AutoScan("")
1305 dev[0].request("AUTOSCAN ")
1306
1307def test_dbus_tdls_invalid(dev, apdev):
1308 """D-Bus invalid TDLS operations"""
1309 (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1310 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1311
1312 hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "test-open" })
1313 connect_2sta_open(dev, hapd)
1314 addr1 = dev[1].p2p_interface_addr()
1315
1316 try:
1317 iface.TDLSDiscover("foo")
1318 raise Exception("Invalid TDLSDiscover() accepted")
1319 except dbus.exceptions.DBusException, e:
1320 if "InvalidArgs" not in str(e):
1321 raise Exception("Unexpected error message for invalid TDLSDiscover: " + str(e))
1322
1323 try:
1324 iface.TDLSStatus("foo")
1325 raise Exception("Invalid TDLSStatus() accepted")
1326 except dbus.exceptions.DBusException, e:
1327 if "InvalidArgs" not in str(e):
1328 raise Exception("Unexpected error message for invalid TDLSStatus: " + str(e))
1329
1330 res = iface.TDLSStatus(addr1)
1331 if res != "peer does not exist":
1332 raise Exception("Unexpected TDLSStatus response")
1333
1334 try:
1335 iface.TDLSSetup("foo")
1336 raise Exception("Invalid TDLSSetup() accepted")
1337 except dbus.exceptions.DBusException, e:
1338 if "InvalidArgs" not in str(e):
1339 raise Exception("Unexpected error message for invalid TDLSSetup: " + str(e))
1340
1341 try:
1342 iface.TDLSTeardown("foo")
1343 raise Exception("Invalid TDLSTeardown() accepted")
1344 except dbus.exceptions.DBusException, e:
1345 if "InvalidArgs" not in str(e):
1346 raise Exception("Unexpected error message for invalid TDLSTeardown: " + str(e))
1347
1348def test_dbus_tdls(dev, apdev):
1349 """D-Bus TDLS"""
1350 (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1351 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1352
1353 hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "test-open" })
1354 connect_2sta_open(dev, hapd)
1355
1356 addr1 = dev[1].p2p_interface_addr()
1357
1358 class TestDbusTdls(TestDbus):
1359 def __init__(self, bus):
1360 TestDbus.__init__(self, bus)
1361 self.tdls_setup = False
1362 self.tdls_teardown = False
1363
1364 def __enter__(self):
1365 gobject.timeout_add(1, self.run_tdls)
1366 gobject.timeout_add(15000, self.timeout)
1367 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1368 "PropertiesChanged")
1369 self.loop.run()
1370 return self
1371
1372 def propertiesChanged(self, properties):
1373 logger.debug("propertiesChanged: %s" % str(properties))
1374
1375 def run_tdls(self, *args):
1376 logger.debug("run_tdls")
1377 iface.TDLSDiscover(addr1)
1378 gobject.timeout_add(100, self.run_tdls2)
1379 return False
1380
1381 def run_tdls2(self, *args):
1382 logger.debug("run_tdls2")
1383 iface.TDLSSetup(addr1)
1384 gobject.timeout_add(500, self.run_tdls3)
1385 return False
1386
1387 def run_tdls3(self, *args):
1388 logger.debug("run_tdls3")
1389 res = iface.TDLSStatus(addr1)
1390 if res == "connected":
1391 self.tdls_setup = True
1392 else:
1393 logger.info("Unexpected TDLSStatus: " + res)
1394 iface.TDLSTeardown(addr1)
1395 gobject.timeout_add(200, self.run_tdls4)
1396 return False
1397
1398 def run_tdls4(self, *args):
1399 logger.debug("run_tdls4")
1400 res = iface.TDLSStatus(addr1)
1401 if res == "peer does not exist":
1402 self.tdls_teardown = True
1403 else:
1404 logger.info("Unexpected TDLSStatus: " + res)
1405 self.loop.quit()
1406 return False
1407
1408 def success(self):
1409 return self.tdls_setup and self.tdls_teardown
1410
1411 with TestDbusTdls(bus) as t:
1412 if not t.success():
1413 raise Exception("Expected signals not seen")
1414
1415def test_dbus_pkcs11(dev, apdev):
1416 """D-Bus SetPKCS11EngineAndModulePath()"""
1417 (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1418 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1419
1420 try:
1421 iface.SetPKCS11EngineAndModulePath("foo", "bar")
1422 except dbus.exceptions.DBusException, e:
1423 if "Error.Failed: Reinit of the EAPOL" not in str(e):
1424 raise Exception("Unexpected error message for invalid SetPKCS11EngineAndModulePath: " + str(e))
1425
1426 try:
1427 iface.SetPKCS11EngineAndModulePath("foo", "")
1428 except dbus.exceptions.DBusException, e:
1429 if "Error.Failed: Reinit of the EAPOL" not in str(e):
1430 raise Exception("Unexpected error message for invalid SetPKCS11EngineAndModulePath: " + str(e))
1431
1432 iface.SetPKCS11EngineAndModulePath("", "bar")
1433 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11EnginePath",
1434 dbus_interface=dbus.PROPERTIES_IFACE)
1435 if res != "":
1436 raise Exception("Unexpected PKCS11EnginePath value: " + res)
1437 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11ModulePath",
1438 dbus_interface=dbus.PROPERTIES_IFACE)
1439 if res != "bar":
1440 raise Exception("Unexpected PKCS11ModulePath value: " + res)
1441
1442 iface.SetPKCS11EngineAndModulePath("", "")
1443 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11EnginePath",
1444 dbus_interface=dbus.PROPERTIES_IFACE)
1445 if res != "":
1446 raise Exception("Unexpected PKCS11EnginePath value: " + res)
1447 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11ModulePath",
1448 dbus_interface=dbus.PROPERTIES_IFACE)
1449 if res != "":
1450 raise Exception("Unexpected PKCS11ModulePath value: " + res)
1451
1452def test_dbus_apscan(dev, apdev):
1453 """D-Bus Get/Set ApScan"""
1454 try:
1455 return _test_dbus_apscan(dev, apdev)
1456 finally:
1457 dev[0].request("AP_SCAN 1")
1458
1459def _test_dbus_apscan(dev, apdev):
1460 (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1461
1462 res = if_obj.Get(WPAS_DBUS_IFACE, "ApScan",
1463 dbus_interface=dbus.PROPERTIES_IFACE)
1464 if res != 1:
1465 raise Exception("Unexpected initial ApScan value: %d" % res)
1466
1467 for i in range(3):
1468 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(i),
1469 dbus_interface=dbus.PROPERTIES_IFACE)
1470 res = if_obj.Get(WPAS_DBUS_IFACE, "ApScan",
1471 dbus_interface=dbus.PROPERTIES_IFACE)
1472 if res != i:
1473 raise Exception("Unexpected ApScan value %d (expected %d)" % (res, i))
1474
1475 try:
1476 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.Int16(-1),
1477 dbus_interface=dbus.PROPERTIES_IFACE)
1478 raise Exception("Invalid Set(ApScan,-1) accepted")
1479 except dbus.exceptions.DBusException, e:
1480 if "Error.Failed: wrong property type" not in str(e):
1481 raise Exception("Unexpected error message for invalid Set(ApScan,-1): " + str(e))
1482
1483 try:
1484 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(123),
1485 dbus_interface=dbus.PROPERTIES_IFACE)
1486 raise Exception("Invalid Set(ApScan,123) accepted")
1487 except dbus.exceptions.DBusException, e:
1488 if "Error.Failed: ap_scan must be 0, 1, or 2" not in str(e):
1489 raise Exception("Unexpected error message for invalid Set(ApScan,123): " + str(e))
1490
1491 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(1),
1492 dbus_interface=dbus.PROPERTIES_IFACE)
1493
1494def test_dbus_fastreauth(dev, apdev):
1495 """D-Bus Get/Set FastReauth"""
1496 (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1497
1498 res = if_obj.Get(WPAS_DBUS_IFACE, "FastReauth",
1499 dbus_interface=dbus.PROPERTIES_IFACE)
1500 if res != True:
1501 raise Exception("Unexpected initial FastReauth value: " + str(res))
1502
1503 for i in [ False, True ]:
1504 if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Boolean(i),
1505 dbus_interface=dbus.PROPERTIES_IFACE)
1506 res = if_obj.Get(WPAS_DBUS_IFACE, "FastReauth",
1507 dbus_interface=dbus.PROPERTIES_IFACE)
1508 if res != i:
1509 raise Exception("Unexpected FastReauth value %d (expected %d)" % (res, i))
1510
1511 try:
1512 if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Int16(-1),
1513 dbus_interface=dbus.PROPERTIES_IFACE)
1514 raise Exception("Invalid Set(FastReauth,-1) accepted")
1515 except dbus.exceptions.DBusException, e:
1516 if "Error.Failed: wrong property type" not in str(e):
1517 raise Exception("Unexpected error message for invalid Set(ApScan,-1): " + str(e))
1518
1519 if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Boolean(True),
1520 dbus_interface=dbus.PROPERTIES_IFACE)
1521
1522def test_dbus_bss_expire(dev, apdev):
1523 """D-Bus Get/Set BSSExpireAge and BSSExpireCount"""
1524 (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1525
1526 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(179),
1527 dbus_interface=dbus.PROPERTIES_IFACE)
1528 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSExpireAge",
1529 dbus_interface=dbus.PROPERTIES_IFACE)
1530 if res != 179:
1531 raise Exception("Unexpected BSSExpireAge value %d (expected %d)" % (res, i))
1532
1533 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(3),
1534 dbus_interface=dbus.PROPERTIES_IFACE)
1535 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSExpireCount",
1536 dbus_interface=dbus.PROPERTIES_IFACE)
1537 if res != 3:
1538 raise Exception("Unexpected BSSExpireCount value %d (expected %d)" % (res, i))
1539
1540 try:
1541 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.Int16(-1),
1542 dbus_interface=dbus.PROPERTIES_IFACE)
1543 raise Exception("Invalid Set(BSSExpireAge,-1) accepted")
1544 except dbus.exceptions.DBusException, e:
1545 if "Error.Failed: wrong property type" not in str(e):
1546 raise Exception("Unexpected error message for invalid Set(BSSExpireAge,-1): " + str(e))
1547
1548 try:
1549 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(9),
1550 dbus_interface=dbus.PROPERTIES_IFACE)
1551 raise Exception("Invalid Set(BSSExpireAge,9) accepted")
1552 except dbus.exceptions.DBusException, e:
1553 if "Error.Failed: BSSExpireAge must be >= 10" not in str(e):
1554 raise Exception("Unexpected error message for invalid Set(BSSExpireAge,9): " + str(e))
1555
1556 try:
1557 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.Int16(-1),
1558 dbus_interface=dbus.PROPERTIES_IFACE)
1559 raise Exception("Invalid Set(BSSExpireCount,-1) accepted")
1560 except dbus.exceptions.DBusException, e:
1561 if "Error.Failed: wrong property type" not in str(e):
1562 raise Exception("Unexpected error message for invalid Set(BSSExpireCount,-1): " + str(e))
1563
1564 try:
1565 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(0),
1566 dbus_interface=dbus.PROPERTIES_IFACE)
1567 raise Exception("Invalid Set(BSSExpireCount,0) accepted")
1568 except dbus.exceptions.DBusException, e:
1569 if "Error.Failed: BSSExpireCount must be > 0" not in str(e):
1570 raise Exception("Unexpected error message for invalid Set(BSSExpireCount,0): " + str(e))
1571
1572 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(180),
1573 dbus_interface=dbus.PROPERTIES_IFACE)
1574 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(2),
1575 dbus_interface=dbus.PROPERTIES_IFACE)
1576
1577def test_dbus_country(dev, apdev):
1578 """D-Bus Get/Set Country"""
1579 try:
1580 return _test_dbus_country(dev, apdev)
1581 finally:
1582 dev[0].request("SET country 00")
1583 subprocess.call(['iw', 'reg', 'set', '00'])
1584
1585def _test_dbus_country(dev, apdev):
1586 (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1587
1588 # work around issues with possible pending regdom event from the end of
1589 # the previous test case
1590 time.sleep(0.2)
1591 dev[0].dump_monitor()
1592
1593 if_obj.Set(WPAS_DBUS_IFACE, "Country", "FI",
1594 dbus_interface=dbus.PROPERTIES_IFACE)
1595 res = if_obj.Get(WPAS_DBUS_IFACE, "Country",
1596 dbus_interface=dbus.PROPERTIES_IFACE)
1597 if res != "FI":
1598 raise Exception("Unexpected Country value %s (expected FI)" % res)
1599
1600 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"])
1601 if ev is None:
1602 raise Exception("regdom change event not seen")
1603 if "init=USER type=COUNTRY alpha2=FI" not in ev:
1604 raise Exception("Unexpected event contents: " + ev)
1605
1606 try:
1607 if_obj.Set(WPAS_DBUS_IFACE, "Country", dbus.Int16(-1),
1608 dbus_interface=dbus.PROPERTIES_IFACE)
1609 raise Exception("Invalid Set(Country,-1) accepted")
1610 except dbus.exceptions.DBusException, e:
1611 if "Error.Failed: wrong property type" not in str(e):
1612 raise Exception("Unexpected error message for invalid Set(Country,-1): " + str(e))
1613
1614 try:
1615 if_obj.Set(WPAS_DBUS_IFACE, "Country", "F",
1616 dbus_interface=dbus.PROPERTIES_IFACE)
1617 raise Exception("Invalid Set(Country,F) accepted")
1618 except dbus.exceptions.DBusException, e:
1619 if "Error.Failed: invalid country code" not in str(e):
1620 raise Exception("Unexpected error message for invalid Set(Country,F): " + str(e))
1621
1622 if_obj.Set(WPAS_DBUS_IFACE, "Country", "00",
1623 dbus_interface=dbus.PROPERTIES_IFACE)
1624
1625 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"])
1626 if ev is None:
1627 raise Exception("regdom change event not seen")
1628 if "init=CORE type=WORLD" not in ev:
1629 raise Exception("Unexpected event contents: " + ev)
1630
1631def test_dbus_scan_interval(dev, apdev):
1632 """D-Bus Get/Set ScanInterval"""
1633 try:
1634 _test_dbus_scan_interval(dev, apdev)
1635 finally:
1636 dev[0].request("SCAN_INTERVAL 5")
1637
1638def _test_dbus_scan_interval(dev, apdev):
1639 (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1640
1641 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(3),
1642 dbus_interface=dbus.PROPERTIES_IFACE)
1643 res = if_obj.Get(WPAS_DBUS_IFACE, "ScanInterval",
1644 dbus_interface=dbus.PROPERTIES_IFACE)
1645 if res != 3:
1646 raise Exception("Unexpected ScanInterval value %d (expected %d)" % (res, i))
1647
1648 try:
1649 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.UInt16(100),
1650 dbus_interface=dbus.PROPERTIES_IFACE)
1651 raise Exception("Invalid Set(ScanInterval,100) accepted")
1652 except dbus.exceptions.DBusException, e:
1653 if "Error.Failed: wrong property type" not in str(e):
1654 raise Exception("Unexpected error message for invalid Set(ScanInterval,100): " + str(e))
1655
1656 try:
1657 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(-1),
1658 dbus_interface=dbus.PROPERTIES_IFACE)
1659 raise Exception("Invalid Set(ScanInterval,-1) accepted")
1660 except dbus.exceptions.DBusException, e:
1661 if "Error.Failed: scan_interval must be >= 0" not in str(e):
1662 raise Exception("Unexpected error message for invalid Set(ScanInterval,-1): " + str(e))
1663
1664 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(5),
1665 dbus_interface=dbus.PROPERTIES_IFACE)
1666
1667def test_dbus_probe_req_reporting(dev, apdev):
1668 """D-Bus Probe Request reporting"""
1669 (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1670 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1671
1672 dev[0].p2p_start_go(freq=2412)
1673 dev[1].p2p_find(social=True)
1674
1675 class TestDbusProbe(TestDbus):
1676 def __init__(self, bus):
1677 TestDbus.__init__(self, bus)
1678 self.reported = False
1679
1680 def __enter__(self):
1681 gobject.timeout_add(1, self.run_test)
1682 gobject.timeout_add(15000, self.timeout)
1683 self.add_signal(self.probeRequest, WPAS_DBUS_IFACE, "ProbeRequest",
1684 byte_arrays=True)
1685 iface.SubscribeProbeReq()
1686 self.loop.run()
1687 return self
1688
1689 def probeRequest(self, args):
1690 logger.debug("probeRequest: args=%s" % str(args))
1691 self.reported = True
1692 self.loop.quit()
1693
1694 def run_test(self, *args):
1695 logger.debug("run_test")
1696 return False
1697
1698 def success(self):
1699 return self.reported
1700
1701 with TestDbusProbe(bus) as t:
1702 if not t.success():
1703 raise Exception("Expected signals not seen")
1704 iface.UnsubscribeProbeReq()
1705
1706 try:
1707 iface.UnsubscribeProbeReq()
1708 raise Exception("Invalid UnsubscribeProbeReq() accepted")
1709 except dbus.exceptions.DBusException, e:
1710 if "NoSubscription" not in str(e):
1711 raise Exception("Unexpected error message for invalid UnsubscribeProbeReq(): " + str(e))
1712
1713 with TestDbusProbe(bus) as t:
1714 if not t.success():
1715 raise Exception("Expected signals not seen")
1716 # On purpose, leave ProbeReq subscription in place to test automatic
1717 # cleanup.
1718
1719 dev[1].p2p_stop_find()
1720 dev[0].remove_group()
1721
1722def test_dbus_p2p_invalid(dev, apdev):
1723 """D-Bus invalid P2P operations"""
1724 (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1725 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
1726
1727 try:
1728 p2p.RejectPeer(path + "/Peers/00112233445566")
1729 raise Exception("Invalid RejectPeer accepted")
1730 except dbus.exceptions.DBusException, e:
1731 if "UnknownError: Failed to call wpas_p2p_reject" not in str(e):
1732 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e))
1733
1734 try:
1735 p2p.RejectPeer("/foo")
1736 raise Exception("Invalid RejectPeer accepted")
1737 except dbus.exceptions.DBusException, e:
1738 if "InvalidArgs" not in str(e):
1739 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e))
1740
1741 tests = [ {'DiscoveryType': 'foo'},
1742 {'RequestedDeviceTypes': 'foo'},
1743 {'RequestedDeviceTypes': ['foo']},
1744 {'RequestedDeviceTypes': ['1','2','3','4','5','6','7','8','9',
1745 '10','11','12','13','14','15','16',
1746 '17']},
1747 {'RequestedDeviceTypes': dbus.Array([], signature="s")},
1748 {'RequestedDeviceTypes': dbus.Array([['foo']], signature="as")},
1749 {'RequestedDeviceTypes': dbus.Array([], signature="i")},
1750 {'RequestedDeviceTypes': [dbus.ByteArray('12345678'),
1751 dbus.ByteArray('1234567')]},
1752 {'Foo': dbus.Int16(1)},
1753 {'Foo': dbus.UInt16(1)},
1754 {'Foo': dbus.Int64(1)},
1755 {'Foo': dbus.UInt64(1)},
1756 {'Foo': dbus.Double(1.23)},
1757 {'Foo': dbus.Signature('s')},
1758 {'Foo': 'bar'}]
1759 for t in tests:
1760 try:
1761 p2p.Find(dbus.Dictionary(t))
1762 raise Exception("Invalid Find accepted")
1763 except dbus.exceptions.DBusException, e:
1764 if "InvalidArgs" not in str(e):
1765 raise Exception("Unexpected error message for invalid Find(): " + str(e))
1766
1767 for p in [ "/foo",
1768 "/fi/w1/wpa_supplicant1/Interfaces/1234",
1769 "/fi/w1/wpa_supplicant1/Interfaces/1234/Networks/1234" ]:
1770 try:
1771 p2p.RemovePersistentGroup(dbus.ObjectPath(p))
1772 raise Exception("Invalid RemovePersistentGroup accepted")
1773 except dbus.exceptions.DBusException, e:
1774 if "InvalidArgs" not in str(e):
1775 raise Exception("Unexpected error message for invalid RemovePersistentGroup: " + str(e))
1776
1777 try:
1778 dev[0].request("P2P_SET disabled 1")
1779 p2p.Listen(5)
1780 raise Exception("Invalid Listen accepted")
1781 except dbus.exceptions.DBusException, e:
1782 if "UnknownError: Could not start P2P listen" not in str(e):
1783 raise Exception("Unexpected error message for invalid Listen: " + str(e))
1784 finally:
1785 dev[0].request("P2P_SET disabled 0")
1786
1787 test_obj = bus.get_object(WPAS_DBUS_SERVICE, path, introspect=False)
1788 test_p2p = dbus.Interface(test_obj, WPAS_DBUS_IFACE_P2PDEVICE)
1789 try:
1790 test_p2p.Listen("foo")
1791 raise Exception("Invalid Listen accepted")
1792 except dbus.exceptions.DBusException, e:
1793 if "InvalidArgs" not in str(e):
1794 raise Exception("Unexpected error message for invalid Listen: " + str(e))
1795
1796 try:
1797 dev[0].request("P2P_SET disabled 1")
1798 p2p.ExtendedListen(dbus.Dictionary({}))
1799 raise Exception("Invalid ExtendedListen accepted")
1800 except dbus.exceptions.DBusException, e:
1801 if "UnknownError: failed to initiate a p2p_ext_listen" not in str(e):
1802 raise Exception("Unexpected error message for invalid ExtendedListen: " + str(e))
1803 finally:
1804 dev[0].request("P2P_SET disabled 0")
1805
1806 try:
1807 dev[0].request("P2P_SET disabled 1")
1808 args = { 'duration1': 30000, 'interval1': 102400,
1809 'duration2': 20000, 'interval2': 102400 }
1810 p2p.PresenceRequest(args)
1811 raise Exception("Invalid PresenceRequest accepted")
1812 except dbus.exceptions.DBusException, e:
1813 if "UnknownError: Failed to invoke presence request" not in str(e):
1814 raise Exception("Unexpected error message for invalid PresenceRequest: " + str(e))
1815 finally:
1816 dev[0].request("P2P_SET disabled 0")
1817
1818 try:
1819 params = dbus.Dictionary({'frequency': dbus.Int32(-1)})
1820 p2p.GroupAdd(params)
1821 raise Exception("Invalid GroupAdd accepted")
1822 except dbus.exceptions.DBusException, e:
1823 if "InvalidArgs" not in str(e):
1824 raise Exception("Unexpected error message for invalid GroupAdd: " + str(e))
1825
1826 try:
1827 params = dbus.Dictionary({'persistent_group_object':
1828 dbus.ObjectPath(path),
1829 'frequency': 2412})
1830 p2p.GroupAdd(params)
1831 raise Exception("Invalid GroupAdd accepted")
1832 except dbus.exceptions.DBusException, e:
1833 if "InvalidArgs" not in str(e):
1834 raise Exception("Unexpected error message for invalid GroupAdd: " + str(e))
1835
1836 try:
1837 p2p.Disconnect()
1838 raise Exception("Invalid Disconnect accepted")
1839 except dbus.exceptions.DBusException, e:
1840 if "UnknownError: failed to disconnect" not in str(e):
1841 raise Exception("Unexpected error message for invalid Disconnect: " + str(e))
1842
1843 try:
1844 dev[0].request("P2P_SET disabled 1")
1845 p2p.Flush()
1846 raise Exception("Invalid Flush accepted")
1847 except dbus.exceptions.DBusException, e:
1848 if "Error.Failed: P2P is not available for this interface" not in str(e):
1849 raise Exception("Unexpected error message for invalid Flush: " + str(e))
1850 finally:
1851 dev[0].request("P2P_SET disabled 0")
1852
1853 try:
1854 dev[0].request("P2P_SET disabled 1")
1855 args = { 'peer': path,
1856 'join': True,
1857 'wps_method': 'pbc',
1858 'frequency': 2412 }
1859 pin = p2p.Connect(args)
1860 raise Exception("Invalid Connect accepted")
1861 except dbus.exceptions.DBusException, e:
1862 if "Error.Failed: P2P is not available for this interface" not in str(e):
1863 raise Exception("Unexpected error message for invalid Connect: " + str(e))
1864 finally:
1865 dev[0].request("P2P_SET disabled 0")
1866
1867 tests = [ { 'frequency': dbus.Int32(-1) },
1868 { 'wps_method': 'pbc' },
1869 { 'wps_method': 'foo' } ]
1870 for args in tests:
1871 try:
1872 pin = p2p.Connect(args)
1873 raise Exception("Invalid Connect accepted")
1874 except dbus.exceptions.DBusException, e:
1875 if "InvalidArgs" not in str(e):
1876 raise Exception("Unexpected error message for invalid Connect: " + str(e))
1877
1878 try:
1879 dev[0].request("P2P_SET disabled 1")
1880 args = { 'peer': path }
1881 pin = p2p.Invite(args)
1882 raise Exception("Invalid Invite accepted")
1883 except dbus.exceptions.DBusException, e:
1884 if "Error.Failed: P2P is not available for this interface" not in str(e):
1885 raise Exception("Unexpected error message for invalid Invite: " + str(e))
1886 finally:
1887 dev[0].request("P2P_SET disabled 0")
1888
1889 try:
1890 args = { 'foo': 'bar' }
1891 pin = p2p.Invite(args)
1892 raise Exception("Invalid Invite accepted")
1893 except dbus.exceptions.DBusException, e:
1894 if "InvalidArgs" not in str(e):
1895 raise Exception("Unexpected error message for invalid Connect: " + str(e))
1896
1897 tests = [ (path, 'display', "InvalidArgs"),
1898 (dbus.ObjectPath(path + "/Peers/00112233445566"),
1899 'display',
1900 "UnknownError: Failed to send provision discovery request"),
1901 (dbus.ObjectPath(path + "/Peers/00112233445566"),
1902 'keypad',
1903 "UnknownError: Failed to send provision discovery request"),
1904 (dbus.ObjectPath(path + "/Peers/00112233445566"),
1905 'pbc',
1906 "UnknownError: Failed to send provision discovery request"),
1907 (dbus.ObjectPath(path + "/Peers/00112233445566"),
1908 'pushbutton',
1909 "UnknownError: Failed to send provision discovery request"),
1910 (dbus.ObjectPath(path + "/Peers/00112233445566"),
1911 'foo', "InvalidArgs") ]
1912 for (p,method,err) in tests:
1913 try:
1914 p2p.ProvisionDiscoveryRequest(p, method)
1915 raise Exception("Invalid ProvisionDiscoveryRequest accepted")
1916 except dbus.exceptions.DBusException, e:
1917 if err not in str(e):
1918 raise Exception("Unexpected error message for invalid ProvisionDiscoveryRequest: " + str(e))
1919
1920 try:
1921 dev[0].request("P2P_SET disabled 1")
1922 if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Peers",
1923 dbus_interface=dbus.PROPERTIES_IFACE)
1924 raise Exception("Invalid Get(Peers) accepted")
1925 except dbus.exceptions.DBusException, e:
1926 if "Error.Failed: P2P is not available for this interface" not in str(e):
1927 raise Exception("Unexpected error message for invalid Get(Peers): " + str(e))
1928 finally:
1929 dev[0].request("P2P_SET disabled 0")
1930
1931def test_dbus_p2p_discovery(dev, apdev):
1932 """D-Bus P2P discovery"""
1933 (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1934 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
1935
1936 addr0 = dev[0].p2p_dev_addr()
1937
1938 dev[1].request("SET sec_device_type 1-0050F204-2")
1939 dev[1].request("VENDOR_ELEM_ADD 1 dd0c0050f2041049000411223344")
1940 dev[1].p2p_listen()
1941 addr1 = dev[1].p2p_dev_addr()
1942 a1 = binascii.unhexlify(addr1.replace(':',''))
1943
1944 wfd_devinfo = "00001c440028"
1945 dev[2].request("SET wifi_display 1")
1946 dev[2].request("WFD_SUBELEM_SET 0 0006" + wfd_devinfo)
1947 wfd = binascii.unhexlify('000006' + wfd_devinfo)
1948 dev[2].p2p_listen()
1949 addr2 = dev[2].p2p_dev_addr()
1950 a2 = binascii.unhexlify(addr2.replace(':',''))
1951
1952 res = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE,
1953 dbus_interface=dbus.PROPERTIES_IFACE)
1954 if 'Peers' not in res:
1955 raise Exception("GetAll result missing Peers")
1956 if len(res['Peers']) != 0:
1957 raise Exception("Unexpected peer(s) in the list")
1958
1959 args = {'DiscoveryType': 'social',
1960 'RequestedDeviceTypes': [dbus.ByteArray('12345678')],
1961 'Timeout': dbus.Int32(1) }
1962 p2p.Find(dbus.Dictionary(args))
1963 p2p.StopFind()
1964
1965 class TestDbusP2p(TestDbus):
1966 def __init__(self, bus):
1967 TestDbus.__init__(self, bus)
1968 self.found = False
1969 self.found2 = False
1970 self.lost = False
1971
1972 def __enter__(self):
1973 gobject.timeout_add(1, self.run_test)
1974 gobject.timeout_add(15000, self.timeout)
1975 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
1976 "DeviceFound")
1977 self.add_signal(self.deviceLost, WPAS_DBUS_IFACE_P2PDEVICE,
1978 "DeviceLost")
1979 self.add_signal(self.provisionDiscoveryResponseEnterPin,
1980 WPAS_DBUS_IFACE_P2PDEVICE,
1981 "ProvisionDiscoveryResponseEnterPin")
1982 self.loop.run()
1983 return self
1984
1985 def deviceFound(self, path):
1986 logger.debug("deviceFound: path=%s" % path)
1987 res = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Peers",
1988 dbus_interface=dbus.PROPERTIES_IFACE)
1989 if len(res) < 1:
1990 raise Exception("Unexpected number of peers")
1991 if path not in res:
1992 raise Exception("Mismatch in peer object path")
1993 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
1994 res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
1995 dbus_interface=dbus.PROPERTIES_IFACE,
1996 byte_arrays=True)
1997 logger.debug("peer properties: " + str(res))
1998
1999 if res['DeviceAddress'] == a1:
2000 if 'SecondaryDeviceTypes' not in res:
2001 raise Exception("Missing SecondaryDeviceTypes")
2002 sec = res['SecondaryDeviceTypes']
2003 if len(sec) < 1:
2004 raise Exception("Secondary device type missing")
2005 if "\x00\x01\x00\x50\xF2\x04\x00\x02" not in sec:
2006 raise Exception("Secondary device type mismatch")
2007
2008 if 'VendorExtension' not in res:
2009 raise Exception("Missing VendorExtension")
2010 vendor = res['VendorExtension']
2011 if len(vendor) < 1:
2012 raise Exception("Vendor extension missing")
2013 if "\x11\x22\x33\x44" not in vendor:
2014 raise Exception("Secondary device type mismatch")
2015
2016 self.found = True
2017 elif res['DeviceAddress'] == a2:
2018 if 'IEs' not in res:
2019 raise Exception("IEs missing")
2020 if res['IEs'] != wfd:
2021 raise Exception("IEs mismatch")
2022 self.found2 = True
2023 else:
2024 raise Exception("Unexpected peer device address")
2025
2026 if self.found and self.found2:
2027 p2p.StopFind()
2028 p2p.RejectPeer(path)
2029 p2p.ProvisionDiscoveryRequest(path, 'display')
2030
2031 def deviceLost(self, path):
2032 logger.debug("deviceLost: path=%s" % path)
2033 self.lost = True
2034 try:
2035 p2p.RejectPeer(path)
2036 raise Exception("Invalid RejectPeer accepted")
2037 except dbus.exceptions.DBusException, e:
2038 if "UnknownError: Failed to call wpas_p2p_reject" not in str(e):
2039 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e))
2040 self.loop.quit()
2041
2042 def provisionDiscoveryResponseEnterPin(self, peer_object):
2043 logger.debug("provisionDiscoveryResponseEnterPin - peer=%s" % peer_object)
2044 p2p.Flush()
2045
2046 def run_test(self, *args):
2047 logger.debug("run_test")
2048 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social',
2049 'Timeout': dbus.Int32(10)}))
2050 return False
2051
2052 def success(self):
2053 return self.found and self.lost and self.found2
2054
2055 with TestDbusP2p(bus) as t:
2056 if not t.success():
2057 raise Exception("Expected signals not seen")
2058
2059 dev[1].request("VENDOR_ELEM_REMOVE 1 *")
2060 dev[1].p2p_stop_find()
2061
2062 p2p.Listen(1)
2063 dev[2].p2p_stop_find()
2064 dev[2].request("P2P_FLUSH")
2065 if not dev[2].discover_peer(addr0):
2066 raise Exception("Peer not found")
2067 p2p.StopFind()
2068 dev[2].p2p_stop_find()
2069
2070 try:
2071 p2p.ExtendedListen(dbus.Dictionary({'foo': 100}))
2072 raise Exception("Invalid ExtendedListen accepted")
2073 except dbus.exceptions.DBusException, e:
2074 if "InvalidArgs" not in str(e):
2075 raise Exception("Unexpected error message for invalid ExtendedListen(): " + str(e))
2076
2077 p2p.ExtendedListen(dbus.Dictionary({'period': 100, 'interval': 1000}))
2078 p2p.ExtendedListen(dbus.Dictionary({}))
2079 dev[0].global_request("P2P_EXT_LISTEN")
2080
2081def test_dbus_p2p_service_discovery(dev, apdev):
2082 """D-Bus P2P service discovery"""
2083 (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2084 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2085
2086 addr0 = dev[0].p2p_dev_addr()
2087 addr1 = dev[1].p2p_dev_addr()
2088
2089 bonjour_query = dbus.ByteArray(binascii.unhexlify('0b5f6166706f766572746370c00c000c01'))
2090 bonjour_response = dbus.ByteArray(binascii.unhexlify('074578616d706c65c027'))
2091
2092 args = { 'service_type': 'bonjour',
2093 'query': bonjour_query,
2094 'response': bonjour_response }
2095 p2p.AddService(args)
2096 p2p.FlushService()
2097 p2p.AddService(args)
2098
2099 try:
2100 p2p.DeleteService(args)
2101 raise Exception("Invalid DeleteService() accepted")
2102 except dbus.exceptions.DBusException, e:
2103 if "InvalidArgs" not in str(e):
2104 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
2105
2106 args = { 'service_type': 'bonjour',
2107 'query': bonjour_query }
2108 p2p.DeleteService(args)
2109 try:
2110 p2p.DeleteService(args)
2111 raise Exception("Invalid DeleteService() accepted")
2112 except dbus.exceptions.DBusException, e:
2113 if "InvalidArgs" not in str(e):
2114 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
2115
2116 args = { 'service_type': 'upnp',
2117 'version': 0x10,
2118 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' }
2119 p2p.AddService(args)
2120 p2p.DeleteService(args)
2121 try:
2122 p2p.DeleteService(args)
2123 raise Exception("Invalid DeleteService() accepted")
2124 except dbus.exceptions.DBusException, e:
2125 if "InvalidArgs" not in str(e):
2126 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
2127
2128 tests = [ { 'service_type': 'foo' },
2129 { 'service_type': 'foo', 'query': bonjour_query },
2130 { 'service_type': 'upnp' },
2131 { 'service_type': 'upnp', 'version': 0x10 },
2132 { 'service_type': 'upnp',
2133 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
2134 { 'version': 0x10,
2135 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
2136 { 'service_type': 'upnp', 'foo': 'bar' },
2137 { 'service_type': 'bonjour' },
2138 { 'service_type': 'bonjour', 'query': 'foo' },
2139 { 'service_type': 'bonjour', 'foo': 'bar' } ]
2140 for args in tests:
2141 try:
2142 p2p.DeleteService(args)
2143 raise Exception("Invalid DeleteService() accepted")
2144 except dbus.exceptions.DBusException, e:
2145 if "InvalidArgs" not in str(e):
2146 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
2147
2148 tests = [ { 'service_type': 'foo' },
2149 { 'service_type': 'upnp' },
2150 { 'service_type': 'upnp', 'version': 0x10 },
2151 { 'service_type': 'upnp',
2152 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
2153 { 'version': 0x10,
2154 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
2155 { 'service_type': 'upnp', 'foo': 'bar' },
2156 { 'service_type': 'bonjour' },
2157 { 'service_type': 'bonjour', 'query': 'foo' },
2158 { 'service_type': 'bonjour', 'response': 'foo' },
2159 { 'service_type': 'bonjour', 'query': bonjour_query },
2160 { 'service_type': 'bonjour', 'response': bonjour_response },
2161 { 'service_type': 'bonjour', 'query': dbus.ByteArray(500*'a') },
2162 { 'service_type': 'bonjour', 'foo': 'bar' } ]
2163 for args in tests:
2164 try:
2165 p2p.AddService(args)
2166 raise Exception("Invalid AddService() accepted")
2167 except dbus.exceptions.DBusException, e:
2168 if "InvalidArgs" not in str(e):
2169 raise Exception("Unexpected error message for invalid AddService(): " + str(e))
2170
2171 args = { 'tlv': dbus.ByteArray("\x02\x00\x00\x01") }
2172 ref = p2p.ServiceDiscoveryRequest(args)
2173 p2p.ServiceDiscoveryCancelRequest(ref)
2174 try:
2175 p2p.ServiceDiscoveryCancelRequest(ref)
2176 raise Exception("Invalid ServiceDiscoveryCancelRequest() accepted")
2177 except dbus.exceptions.DBusException, e:
2178 if "InvalidArgs" not in str(e):
2179 raise Exception("Unexpected error message for invalid AddService(): " + str(e))
2180 try:
2181 p2p.ServiceDiscoveryCancelRequest(dbus.UInt64(0))
2182 raise Exception("Invalid ServiceDiscoveryCancelRequest() accepted")
2183 except dbus.exceptions.DBusException, e:
2184 if "InvalidArgs" not in str(e):
2185 raise Exception("Unexpected error message for invalid AddService(): " + str(e))
2186
2187 args = { 'service_type': 'upnp',
2188 'version': 0x10,
2189 'service': 'ssdp:foo' }
2190 ref = p2p.ServiceDiscoveryRequest(args)
2191 p2p.ServiceDiscoveryCancelRequest(ref)
2192
2193 tests = [ { 'service_type': 'foo' },
2194 { 'foo': 'bar' },
2195 { 'tlv': 'foo' },
2196 { },
2197 { 'version': 0 },
2198 { 'service_type': 'upnp',
2199 'service': 'ssdp:foo' },
2200 { 'service_type': 'upnp',
2201 'version': 0x10 },
2202 { 'service_type': 'upnp',
2203 'version': 0x10,
2204 'service': 'ssdp:foo',
2205 'peer_object': dbus.ObjectPath(path + "/Peers") },
2206 { 'service_type': 'upnp',
2207 'version': 0x10,
2208 'service': 'ssdp:foo',
2209 'peer_object': path + "/Peers" },
2210 { 'service_type': 'upnp',
2211 'version': 0x10,
2212 'service': 'ssdp:foo',
2213 'peer_object': dbus.ObjectPath(path + "/Peers/00112233445566") } ]
2214 for args in tests:
2215 try:
2216 p2p.ServiceDiscoveryRequest(args)
2217 raise Exception("Invalid ServiceDiscoveryRequest accepted")
2218 except dbus.exceptions.DBusException, e:
2219 if "InvalidArgs" not in str(e):
2220 raise Exception("Unexpected error message for invalid ServiceDiscoveryRequest(): " + str(e))
2221
2222 args = { 'foo': 'bar' }
2223 try:
2224 p2p.ServiceDiscoveryResponse(dbus.Dictionary(args, signature='sv'))
2225 raise Exception("Invalid ServiceDiscoveryResponse accepted")
2226 except dbus.exceptions.DBusException, e:
2227 if "InvalidArgs" not in str(e):
2228 raise Exception("Unexpected error message for invalid ServiceDiscoveryResponse(): " + str(e))
2229
2230def test_dbus_p2p_service_discovery_query(dev, apdev):
2231 """D-Bus P2P service discovery query"""
2232 (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2233 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2234
2235 addr0 = dev[0].p2p_dev_addr()
2236 dev[1].request("P2P_SERVICE_ADD bonjour 0b5f6166706f766572746370c00c000c01 074578616d706c65c027")
2237 dev[1].p2p_listen()
2238 addr1 = dev[1].p2p_dev_addr()
2239
2240 class TestDbusP2p(TestDbus):
2241 def __init__(self, bus):
2242 TestDbus.__init__(self, bus)
2243 self.done = False
2244
2245 def __enter__(self):
2246 gobject.timeout_add(1, self.run_test)
2247 gobject.timeout_add(15000, self.timeout)
2248 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
2249 "DeviceFound")
2250 self.add_signal(self.serviceDiscoveryResponse,
2251 WPAS_DBUS_IFACE_P2PDEVICE,
2252 "ServiceDiscoveryResponse", byte_arrays=True)
2253 self.loop.run()
2254 return self
2255
2256 def deviceFound(self, path):
2257 logger.debug("deviceFound: path=%s" % path)
2258 args = { 'peer_object': path,
2259 'tlv': dbus.ByteArray("\x02\x00\x00\x01") }
2260 p2p.ServiceDiscoveryRequest(args)
2261
2262 def serviceDiscoveryResponse(self, sd_request):
2263 logger.debug("serviceDiscoveryResponse: sd_request=%s" % str(sd_request))
2264 self.done = True
2265 self.loop.quit()
2266
2267 def run_test(self, *args):
2268 logger.debug("run_test")
2269 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social',
2270 'Timeout': dbus.Int32(10)}))
2271 return False
2272
2273 def success(self):
2274 return self.done
2275
2276 with TestDbusP2p(bus) as t:
2277 if not t.success():
2278 raise Exception("Expected signals not seen")
2279
2280 dev[1].p2p_stop_find()
2281
2282def test_dbus_p2p_service_discovery_external(dev, apdev):
2283 """D-Bus P2P service discovery with external response"""
2284 try:
2285 _test_dbus_p2p_service_discovery_external(dev, apdev)
2286 finally:
2287 dev[0].request("P2P_SERV_DISC_EXTERNAL 0")
2288
2289def _test_dbus_p2p_service_discovery_external(dev, apdev):
2290 (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2291 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2292
2293 addr0 = dev[0].p2p_dev_addr()
2294 addr1 = dev[1].p2p_dev_addr()
2295 resp = "0300000101"
2296
2297 dev[1].request("P2P_FLUSH")
2298 dev[1].request("P2P_SERV_DISC_REQ " + addr0 + " 02000001")
2299 dev[1].p2p_find(social=True)
2300
2301 class TestDbusP2p(TestDbus):
2302 def __init__(self, bus):
2303 TestDbus.__init__(self, bus)
2304 self.sd = False
2305
2306 def __enter__(self):
2307 gobject.timeout_add(1, self.run_test)
2308 gobject.timeout_add(15000, self.timeout)
2309 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
2310 "DeviceFound")
2311 self.add_signal(self.serviceDiscoveryRequest,
2312 WPAS_DBUS_IFACE_P2PDEVICE,
2313 "ServiceDiscoveryRequest")
2314 self.loop.run()
2315 return self
2316
2317 def deviceFound(self, path):
2318 logger.debug("deviceFound: path=%s" % path)
2319
2320 def serviceDiscoveryRequest(self, sd_request):
2321 logger.debug("serviceDiscoveryRequest: sd_request=%s" % str(sd_request))
2322 self.sd = True
2323 args = { 'peer_object': sd_request['peer_object'],
2324 'frequency': sd_request['frequency'],
2325 'dialog_token': sd_request['dialog_token'],
2326 'tlvs': dbus.ByteArray(binascii.unhexlify(resp)) }
2327 p2p.ServiceDiscoveryResponse(dbus.Dictionary(args, signature='sv'))
2328 self.loop.quit()
2329
2330 def run_test(self, *args):
2331 logger.debug("run_test")
2332 p2p.ServiceDiscoveryExternal(1)
2333 p2p.ServiceUpdate()
2334 p2p.Listen(15)
2335 return False
2336
2337 def success(self):
2338 return self.sd
2339
2340 with TestDbusP2p(bus) as t:
2341 if not t.success():
2342 raise Exception("Expected signals not seen")
2343
2344 ev = dev[1].wait_global_event(["P2P-SERV-DISC-RESP"], timeout=5)
2345 if ev is None:
2346 raise Exception("Service discovery timed out")
2347 if addr0 not in ev:
2348 raise Exception("Unexpected address in SD Response: " + ev)
2349 if ev.split(' ')[4] != resp:
2350 raise Exception("Unexpected response data SD Response: " + ev)
2351 dev[1].p2p_stop_find()
2352
2353 p2p.StopFind()
2354 p2p.ServiceDiscoveryExternal(0)
2355
2356def test_dbus_p2p_autogo(dev, apdev):
2357 """D-Bus P2P autonomous GO"""
2358 (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2359 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2360 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
2361
2362 addr0 = dev[0].p2p_dev_addr()
2363
2364 class TestDbusP2p(TestDbus):
2365 def __init__(self, bus):
2366 TestDbus.__init__(self, bus)
2367 self.first = True
2368 self.waiting_end = False
2369 self.done = False
2370
2371 def __enter__(self):
2372 gobject.timeout_add(1, self.run_test)
2373 gobject.timeout_add(15000, self.timeout)
2374 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
2375 "DeviceFound")
2376 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
2377 "GroupStarted")
2378 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
2379 "GroupFinished")
2380 self.add_signal(self.persistentGroupAdded,
2381 WPAS_DBUS_IFACE_P2PDEVICE,
2382 "PersistentGroupAdded")
2383 self.add_signal(self.persistentGroupRemoved,
2384 WPAS_DBUS_IFACE_P2PDEVICE,
2385 "PersistentGroupRemoved")
2386 self.add_signal(self.provisionDiscoveryRequestDisplayPin,
2387 WPAS_DBUS_IFACE_P2PDEVICE,
2388 "ProvisionDiscoveryRequestDisplayPin")
2389 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
2390 "StaAuthorized")
2391 self.loop.run()
2392 return self
2393
2394 def groupStarted(self, properties):
2395 logger.debug("groupStarted: " + str(properties))
2396 self.group = properties['group_object']
2397 role = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Role",
2398 dbus_interface=dbus.PROPERTIES_IFACE)
2399 if role != "GO":
2400 raise Exception("Unexpected role reported: " + role)
2401 group = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Group",
2402 dbus_interface=dbus.PROPERTIES_IFACE)
2403 if group != properties['group_object']:
2404 raise Exception("Unexpected Group reported: " + str(group))
2405 go = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PeerGO",
2406 dbus_interface=dbus.PROPERTIES_IFACE)
2407 if go != '/':
2408 raise Exception("Unexpected PeerGO value: " + str(go))
2409 if self.first:
2410 self.first = False
2411 logger.info("Remove persistent group instance")
2412 p2p.Disconnect()
2413 else:
2414 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
2415 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 join")
2416
2417 def groupFinished(self, properties):
2418 logger.debug("groupFinished: " + str(properties))
2419 if self.waiting_end:
2420 logger.info("Remove persistent group")
2421 p2p.RemovePersistentGroup(self.persistent)
2422 else:
2423 logger.info("Re-start persistent group")
2424 params = dbus.Dictionary({'persistent_group_object':
2425 self.persistent,
2426 'frequency': 2412})
2427 p2p.GroupAdd(params)
2428
2429 def persistentGroupAdded(self, path, properties):
2430 logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
2431 self.persistent = path
2432
2433 def persistentGroupRemoved(self, path):
2434 logger.debug("persistentGroupRemoved: %s" % path)
2435 self.done = True
2436 self.loop.quit()
2437
2438 def deviceFound(self, path):
2439 logger.debug("deviceFound: path=%s" % path)
2440 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
2441 self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
2442 dbus_interface=dbus.PROPERTIES_IFACE,
2443 byte_arrays=True)
2444 logger.debug('peer properties: ' + str(self.peer))
2445
2446 def provisionDiscoveryRequestDisplayPin(self, peer_object, pin):
2447 logger.debug("provisionDiscoveryRequestDisplayPin - peer=%s pin=%s" % (peer_object, pin))
2448 self.peer_path = peer_object
2449 peer = binascii.unhexlify(peer_object.split('/')[-1])
2450 addr = ""
2451 for p in peer:
2452 if len(addr) > 0:
2453 addr += ':'
2454 addr += '%02x' % ord(p)
2455 params = { 'Role': 'registrar',
2456 'P2PDeviceAddress': self.peer['DeviceAddress'],
2457 'Bssid': self.peer['DeviceAddress'],
2458 'Type': 'pin',
2459 'Pin': '12345670' }
2460 logger.info("Authorize peer to connect to the group")
2461 wps.Start(params)
2462
2463 def staAuthorized(self, name):
2464 logger.debug("staAuthorized: " + name)
2465 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, self.peer_path)
2466 res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
2467 dbus_interface=dbus.PROPERTIES_IFACE,
2468 byte_arrays=True)
2469 if 'Groups' not in res or len(res['Groups']) != 1:
2470 raise Exception("Unexpected number of peer Groups entries")
2471 if res['Groups'][0] != self.group:
2472 raise Exception("Unexpected peer Groups[0] value")
2473
2474 g_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group)
2475 res = g_obj.GetAll(WPAS_DBUS_GROUP,
2476 dbus_interface=dbus.PROPERTIES_IFACE,
2477 byte_arrays=True)
2478 logger.debug("Group properties: " + str(res))
2479 if 'Members' not in res or len(res['Members']) != 1:
2480 raise Exception("Unexpected number of group members")
2481
2482 ext = dbus.ByteArray("\x11\x22\x33\x44")
2483 # Earlier implementation of this interface was a bit strange. The
2484 # property is defined to have aay signature and that is what the
2485 # getter returned. However, the setter expected there to be a
2486 # dictionary with 'WPSVendorExtensions' as the key surrounding these
2487 # values.. The current implementations maintains support for that
2488 # for backwards compability reasons. Verify that encoding first.
2489 vals = dbus.Dictionary({ 'WPSVendorExtensions': [ ext ]},
2490 signature='sv')
2491 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
2492 dbus_interface=dbus.PROPERTIES_IFACE)
2493 res = g_obj.Get(WPAS_DBUS_GROUP, 'WPSVendorExtensions',
2494 dbus_interface=dbus.PROPERTIES_IFACE,
2495 byte_arrays=True)
2496 if len(res) != 1:
2497 raise Exception("Unexpected number of vendor extensions")
2498 if res[0] != ext:
2499 raise Exception("Vendor extension value changed")
2500
2501 # And now verify that the more appropriate encoding is accepted as
2502 # well.
2503 res.append(dbus.ByteArray('\xaa\xbb\xcc\xdd\xee\xff'))
2504 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
2505 dbus_interface=dbus.PROPERTIES_IFACE)
2506 res2 = g_obj.Get(WPAS_DBUS_GROUP, 'WPSVendorExtensions',
2507 dbus_interface=dbus.PROPERTIES_IFACE,
2508 byte_arrays=True)
2509 if len(res) != 2:
2510 raise Exception("Unexpected number of vendor extensions")
2511 if res[0] != res2[0] or res[1] != res2[1]:
2512 raise Exception("Vendor extension value changed")
2513
2514 for i in range(10):
2515 res.append(dbus.ByteArray('\xaa\xbb'))
2516 try:
2517 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
2518 dbus_interface=dbus.PROPERTIES_IFACE)
2519 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
2520 except dbus.exceptions.DBusException, e:
2521 if "Error.Failed" not in str(e):
2522 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
2523
2524 vals = dbus.Dictionary({ 'Foo': [ ext ]}, signature='sv')
2525 try:
2526 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
2527 dbus_interface=dbus.PROPERTIES_IFACE)
2528 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
2529 except dbus.exceptions.DBusException, e:
2530 if "InvalidArgs" not in str(e):
2531 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
2532
2533 vals = [ "foo" ]
2534 try:
2535 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
2536 dbus_interface=dbus.PROPERTIES_IFACE)
2537 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
2538 except dbus.exceptions.DBusException, e:
2539 if "Error.Failed" not in str(e):
2540 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
2541
2542 vals = [ [ "foo" ] ]
2543 try:
2544 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
2545 dbus_interface=dbus.PROPERTIES_IFACE)
2546 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
2547 except dbus.exceptions.DBusException, e:
2548 if "Error.Failed" not in str(e):
2549 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
2550
2551 self.waiting_end = True
2552 p2p.Disconnect()
2553
2554 def run_test(self, *args):
2555 logger.debug("run_test")
2556 params = dbus.Dictionary({'persistent': True,
2557 'frequency': 2412})
2558 logger.info("Add a persistent group")
2559 p2p.GroupAdd(params)
2560 return False
2561
2562 def success(self):
2563 return self.done
2564
2565 with TestDbusP2p(bus) as t:
2566 if not t.success():
2567 raise Exception("Expected signals not seen")
2568
2569 dev[1].wait_go_ending_session()
2570
2571def test_dbus_p2p_autogo_pbc(dev, apdev):
2572 """D-Bus P2P autonomous GO and PBC"""
2573 (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2574 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2575 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
2576
2577 addr0 = dev[0].p2p_dev_addr()
2578
2579 class TestDbusP2p(TestDbus):
2580 def __init__(self, bus):
2581 TestDbus.__init__(self, bus)
2582 self.first = True
2583 self.waiting_end = False
2584 self.done = False
2585
2586 def __enter__(self):
2587 gobject.timeout_add(1, self.run_test)
2588 gobject.timeout_add(15000, self.timeout)
2589 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
2590 "DeviceFound")
2591 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
2592 "GroupStarted")
2593 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
2594 "GroupFinished")
2595 self.add_signal(self.provisionDiscoveryPBCRequest,
2596 WPAS_DBUS_IFACE_P2PDEVICE,
2597 "ProvisionDiscoveryPBCRequest")
2598 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
2599 "StaAuthorized")
2600 self.loop.run()
2601 return self
2602
2603 def groupStarted(self, properties):
2604 logger.debug("groupStarted: " + str(properties))
2605 self.group = properties['group_object']
2606 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
2607 dev1.global_request("P2P_CONNECT " + addr0 + " pbc join")
2608
2609 def groupFinished(self, properties):
2610 logger.debug("groupFinished: " + str(properties))
2611 self.done = True
2612 self.loop.quit()
2613
2614 def deviceFound(self, path):
2615 logger.debug("deviceFound: path=%s" % path)
2616 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
2617 self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
2618 dbus_interface=dbus.PROPERTIES_IFACE,
2619 byte_arrays=True)
2620 logger.debug('peer properties: ' + str(self.peer))
2621
2622 def provisionDiscoveryPBCRequest(self, peer_object):
2623 logger.debug("provisionDiscoveryPBCRequest - peer=%s" % peer_object)
2624 self.peer_path = peer_object
2625 peer = binascii.unhexlify(peer_object.split('/')[-1])
2626 addr = ""
2627 for p in peer:
2628 if len(addr) > 0:
2629 addr += ':'
2630 addr += '%02x' % ord(p)
2631 params = { 'Role': 'registrar',
2632 'P2PDeviceAddress': self.peer['DeviceAddress'],
2633 'Bssid': self.peer['DeviceAddress'],
2634 'Type': 'pbc' }
2635 logger.info("Authorize peer to connect to the group")
2636 wps.Start(params)
2637
2638 def staAuthorized(self, name):
2639 logger.debug("staAuthorized: " + name)
2640 p2p.Disconnect()
2641
2642 def run_test(self, *args):
2643 logger.debug("run_test")
2644 params = dbus.Dictionary({'frequency': 2412})
2645 p2p.GroupAdd(params)
2646 return False
2647
2648 def success(self):
2649 return self.done
2650
2651 with TestDbusP2p(bus) as t:
2652 if not t.success():
2653 raise Exception("Expected signals not seen")
2654
2655 dev[1].wait_go_ending_session()
2656 dev[1].flush_scan_cache()
2657
2658def test_dbus_p2p_autogo_legacy(dev, apdev):
2659 """D-Bus P2P autonomous GO and legacy STA"""
2660 (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2661 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2662 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
2663
2664 addr0 = dev[0].p2p_dev_addr()
2665
2666 class TestDbusP2p(TestDbus):
2667 def __init__(self, bus):
2668 TestDbus.__init__(self, bus)
2669 self.done = False
2670
2671 def __enter__(self):
2672 gobject.timeout_add(1, self.run_test)
2673 gobject.timeout_add(15000, self.timeout)
2674 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
2675 "GroupStarted")
2676 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
2677 "GroupFinished")
2678 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
2679 "StaAuthorized")
2680 self.loop.run()
2681 return self
2682
2683 def groupStarted(self, properties):
2684 logger.debug("groupStarted: " + str(properties))
2685 pin = '12345670'
2686 params = { 'Role': 'enrollee',
2687 'Type': 'pin',
2688 'Pin': pin }
2689 wps.Start(params)
2690 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
2691 dev1.scan_for_bss(addr0, freq=2412)
2692 dev1.request("WPS_PIN " + addr0 + " " + pin)
2693
2694 def groupFinished(self, properties):
2695 logger.debug("groupFinished: " + str(properties))
2696 self.done = True
2697 self.loop.quit()
2698
2699 def staAuthorized(self, name):
2700 logger.debug("staAuthorized: " + name)
2701 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
2702 dev1.request("DISCONNECT")
2703 p2p.Disconnect()
2704
2705 def run_test(self, *args):
2706 logger.debug("run_test")
2707 params = dbus.Dictionary({'frequency': 2412})
2708 p2p.GroupAdd(params)
2709 return False
2710
2711 def success(self):
2712 return self.done
2713
2714 with TestDbusP2p(bus) as t:
2715 if not t.success():
2716 raise Exception("Expected signals not seen")
2717
2718def test_dbus_p2p_join(dev, apdev):
2719 """D-Bus P2P join an autonomous GO"""
2720 (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2721 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2722
2723 addr1 = dev[1].p2p_dev_addr()
2724 addr2 = dev[2].p2p_dev_addr()
2725 dev[1].p2p_start_go(freq=2412)
2726 dev[2].p2p_listen()
2727
2728 class TestDbusP2p(TestDbus):
2729 def __init__(self, bus):
2730 TestDbus.__init__(self, bus)
2731 self.done = False
2732 self.peer = None
2733 self.go = None
2734
2735 def __enter__(self):
2736 gobject.timeout_add(1, self.run_test)
2737 gobject.timeout_add(15000, self.timeout)
2738 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
2739 "DeviceFound")
2740 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
2741 "GroupStarted")
2742 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
2743 "GroupFinished")
2744 self.add_signal(self.invitationResult, WPAS_DBUS_IFACE_P2PDEVICE,
2745 "InvitationResult")
2746 self.loop.run()
2747 return self
2748
2749 def deviceFound(self, path):
2750 logger.debug("deviceFound: path=%s" % path)
2751 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
2752 res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
2753 dbus_interface=dbus.PROPERTIES_IFACE,
2754 byte_arrays=True)
2755 logger.debug('peer properties: ' + str(res))
2756 if addr2.replace(':','') in path:
2757 self.peer = path
2758 elif addr1.replace(':','') in path:
2759 self.go = path
2760 if self.peer and self.go:
2761 logger.info("Join the group")
2762 p2p.StopFind()
2763 args = { 'peer': self.go,
2764 'join': True,
2765 'wps_method': 'pin',
2766 'frequency': 2412 }
2767 pin = p2p.Connect(args)
2768
2769 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
2770 dev1.request("WPS_PIN any " + pin)
2771
2772 def groupStarted(self, properties):
2773 logger.debug("groupStarted: " + str(properties))
2774 role = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Role",
2775 dbus_interface=dbus.PROPERTIES_IFACE)
2776 if role != "client":
2777 raise Exception("Unexpected role reported: " + role)
2778 group = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Group",
2779 dbus_interface=dbus.PROPERTIES_IFACE)
2780 if group != properties['group_object']:
2781 raise Exception("Unexpected Group reported: " + str(group))
2782 go = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PeerGO",
2783 dbus_interface=dbus.PROPERTIES_IFACE)
2784 if go != self.go:
2785 raise Exception("Unexpected PeerGO value: " + str(go))
2786
2787 g_obj = bus.get_object(WPAS_DBUS_SERVICE,
2788 properties['group_object'])
2789 res = g_obj.GetAll(WPAS_DBUS_GROUP,
2790 dbus_interface=dbus.PROPERTIES_IFACE,
2791 byte_arrays=True)
2792 logger.debug("Group properties: " + str(res))
2793
2794 ext = dbus.ByteArray("\x11\x22\x33\x44")
2795 try:
2796 # Set(WPSVendorExtensions) not allowed for P2P Client
2797 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
2798 dbus_interface=dbus.PROPERTIES_IFACE)
2799 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
2800 except dbus.exceptions.DBusException, e:
2801 if "Error.Failed: Failed to set property" not in str(e):
2802 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
2803
2804 args = { 'duration1': 30000, 'interval1': 102400,
2805 'duration2': 20000, 'interval2': 102400 }
2806 p2p.PresenceRequest(args)
2807
2808 args = { 'peer': self.peer }
2809 p2p.Invite(args)
2810
2811 def groupFinished(self, properties):
2812 logger.debug("groupFinished: " + str(properties))
2813 self.done = True
2814 self.loop.quit()
2815
2816 def invitationResult(self, result):
2817 logger.debug("invitationResult: " + str(result))
2818 if result['status'] != 1:
2819 raise Exception("Unexpected invitation result: " + str(result))
2820 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
2821 dev1.remove_group()
2822
2823 def run_test(self, *args):
2824 logger.debug("run_test")
2825 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
2826 return False
2827
2828 def success(self):
2829 return self.done
2830
2831 with TestDbusP2p(bus) as t:
2832 if not t.success():
2833 raise Exception("Expected signals not seen")
2834
2835 dev[2].p2p_stop_find()
2836
2837def test_dbus_p2p_config(dev, apdev):
2838 """D-Bus Get/Set P2PDeviceConfig"""
2839 try:
2840 _test_dbus_p2p_config(dev, apdev)
2841 finally:
2842 dev[0].request("P2P_SET ssid_postfix ")
2843
2844def _test_dbus_p2p_config(dev, apdev):
2845 (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2846 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2847
2848 res = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
2849 dbus_interface=dbus.PROPERTIES_IFACE,
2850 byte_arrays=True)
2851 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig", res,
2852 dbus_interface=dbus.PROPERTIES_IFACE)
2853 res2 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
2854 dbus_interface=dbus.PROPERTIES_IFACE,
2855 byte_arrays=True)
2856
2857 if len(res) != len(res2):
2858 raise Exception("Different number of parameters")
2859 for k in res:
2860 if res[k] != res2[k]:
2861 raise Exception("Parameter %s value changes" % k)
2862
2863 changes = { 'SsidPostfix': 'foo',
2864 'VendorExtension': [ dbus.ByteArray('\x11\x22\x33\x44') ],
2865 'SecondaryDeviceTypes': [ dbus.ByteArray('\x11\x22\x33\x44\x55\x66\x77\x88') ]}
2866 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
2867 dbus.Dictionary(changes, signature='sv'),
2868 dbus_interface=dbus.PROPERTIES_IFACE)
2869
2870 res2 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
2871 dbus_interface=dbus.PROPERTIES_IFACE,
2872 byte_arrays=True)
2873 logger.debug("P2PDeviceConfig: " + str(res2))
2874 if 'VendorExtension' not in res2 or len(res2['VendorExtension']) != 1:
2875 raise Exception("VendorExtension does not match")
2876 if 'SecondaryDeviceTypes' not in res2 or len(res2['SecondaryDeviceTypes']) != 1:
2877 raise Exception("SecondaryDeviceType does not match")
2878
2879 changes = { 'SsidPostfix': '',
2880 'VendorExtension': dbus.Array([], signature="ay"),
2881 'SecondaryDeviceTypes': dbus.Array([], signature="ay") }
2882 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
2883 dbus.Dictionary(changes, signature='sv'),
2884 dbus_interface=dbus.PROPERTIES_IFACE)
2885
2886 res3 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
2887 dbus_interface=dbus.PROPERTIES_IFACE,
2888 byte_arrays=True)
2889 logger.debug("P2PDeviceConfig: " + str(res3))
2890 if 'VendorExtension' in res3:
2891 raise Exception("VendorExtension not removed")
2892 if 'SecondaryDeviceTypes' in res3:
2893 raise Exception("SecondaryDeviceType not removed")
2894
2895 try:
2896 dev[0].request("P2P_SET disabled 1")
2897 if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
2898 dbus_interface=dbus.PROPERTIES_IFACE,
2899 byte_arrays=True)
2900 raise Exception("Invalid Get(P2PDeviceConfig) accepted")
2901 except dbus.exceptions.DBusException, e:
2902 if "Error.Failed: P2P is not available for this interface" not in str(e):
2903 raise Exception("Unexpected error message for invalid Invite: " + str(e))
2904 finally:
2905 dev[0].request("P2P_SET disabled 0")
2906
2907 try:
2908 dev[0].request("P2P_SET disabled 1")
2909 changes = { 'SsidPostfix': 'foo' }
2910 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
2911 dbus.Dictionary(changes, signature='sv'),
2912 dbus_interface=dbus.PROPERTIES_IFACE)
2913 raise Exception("Invalid Set(P2PDeviceConfig) accepted")
2914 except dbus.exceptions.DBusException, e:
2915 if "Error.Failed: P2P is not available for this interface" not in str(e):
2916 raise Exception("Unexpected error message for invalid Invite: " + str(e))
2917 finally:
2918 dev[0].request("P2P_SET disabled 0")
2919
2920 tests = [ { 'DeviceName': 123 },
2921 { 'SsidPostfix': 123 },
2922 { 'Foo': 'Bar' } ]
2923 for changes in tests:
2924 try:
2925 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
2926 dbus.Dictionary(changes, signature='sv'),
2927 dbus_interface=dbus.PROPERTIES_IFACE)
2928 raise Exception("Invalid Set(P2PDeviceConfig) accepted")
2929 except dbus.exceptions.DBusException, e:
2930 if "InvalidArgs" not in str(e):
2931 raise Exception("Unexpected error message for invalid Invite: " + str(e))
2932
2933def test_dbus_p2p_persistent(dev, apdev):
2934 """D-Bus P2P persistent group"""
2935 (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2936 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2937
2938 class TestDbusP2p(TestDbus):
2939 def __init__(self, bus):
2940 TestDbus.__init__(self, bus)
2941
2942 def __enter__(self):
2943 gobject.timeout_add(1, self.run_test)
2944 gobject.timeout_add(15000, self.timeout)
2945 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
2946 "GroupStarted")
2947 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
2948 "GroupFinished")
2949 self.add_signal(self.persistentGroupAdded,
2950 WPAS_DBUS_IFACE_P2PDEVICE,
2951 "PersistentGroupAdded")
2952 self.loop.run()
2953 return self
2954
2955 def groupStarted(self, properties):
2956 logger.debug("groupStarted: " + str(properties))
2957 p2p.Disconnect()
2958
2959 def groupFinished(self, properties):
2960 logger.debug("groupFinished: " + str(properties))
2961 self.loop.quit()
2962
2963 def persistentGroupAdded(self, path, properties):
2964 logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
2965 self.persistent = path
2966
2967 def run_test(self, *args):
2968 logger.debug("run_test")
2969 params = dbus.Dictionary({'persistent': True,
2970 'frequency': 2412})
2971 logger.info("Add a persistent group")
2972 p2p.GroupAdd(params)
2973 return False
2974
2975 def success(self):
2976 return True
2977
2978 with TestDbusP2p(bus) as t:
2979 if not t.success():
2980 raise Exception("Expected signals not seen")
2981 persistent = t.persistent
2982
2983 p_obj = bus.get_object(WPAS_DBUS_SERVICE, persistent)
2984 res = p_obj.Get(WPAS_DBUS_PERSISTENT_GROUP, "Properties",
2985 dbus_interface=dbus.PROPERTIES_IFACE, byte_arrays=True)
2986 logger.info("Persistent group Properties: " + str(res))
2987 vals = dbus.Dictionary({ 'ssid': 'DIRECT-foo' }, signature='sv')
2988 p_obj.Set(WPAS_DBUS_PERSISTENT_GROUP, "Properties", vals,
2989 dbus_interface=dbus.PROPERTIES_IFACE)
2990 res2 = p_obj.Get(WPAS_DBUS_PERSISTENT_GROUP, "Properties",
2991 dbus_interface=dbus.PROPERTIES_IFACE)
2992 if len(res) != len(res2):
2993 raise Exception("Different number of parameters")
2994 for k in res:
2995 if k != 'ssid' and res[k] != res2[k]:
2996 raise Exception("Parameter %s value changes" % k)
2997 if res2['ssid'] != '"DIRECT-foo"':
2998 raise Exception("Unexpected ssid")
2999
3000 args = dbus.Dictionary({ 'ssid': 'DIRECT-testing',
3001 'psk': '1234567890' }, signature='sv')
3002 group = p2p.AddPersistentGroup(args)
3003
3004 groups = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PersistentGroups",
3005 dbus_interface=dbus.PROPERTIES_IFACE)
3006 if len(groups) != 2:
3007 raise Exception("Unexpected number of persistent groups: " + str(groups))
3008
3009 p2p.RemoveAllPersistentGroups()
3010
3011 groups = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PersistentGroups",
3012 dbus_interface=dbus.PROPERTIES_IFACE)
3013 if len(groups) != 0:
3014 raise Exception("Unexpected number of persistent groups: " + str(groups))
3015
3016 try:
3017 p2p.RemovePersistentGroup(persistent)
3018 raise Exception("Invalid RemovePersistentGroup accepted")
3019 except dbus.exceptions.DBusException, e:
3020 if "NetworkUnknown: There is no such persistent group" not in str(e):
3021 raise Exception("Unexpected error message for invalid RemovePersistentGroup: " + str(e))
3022
3023def test_dbus_p2p_reinvoke_persistent(dev, apdev):
3024 """D-Bus P2P reinvoke persistent group"""
3025 (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3026 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3027 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
3028
3029 addr0 = dev[0].p2p_dev_addr()
3030
3031 class TestDbusP2p(TestDbus):
3032 def __init__(self, bus):
3033 TestDbus.__init__(self, bus)
3034 self.first = True
3035 self.waiting_end = False
3036 self.done = False
3037 self.invited = False
3038
3039 def __enter__(self):
3040 gobject.timeout_add(1, self.run_test)
3041 gobject.timeout_add(15000, self.timeout)
3042 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3043 "DeviceFound")
3044 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3045 "GroupStarted")
3046 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3047 "GroupFinished")
3048 self.add_signal(self.persistentGroupAdded,
3049 WPAS_DBUS_IFACE_P2PDEVICE,
3050 "PersistentGroupAdded")
3051 self.add_signal(self.provisionDiscoveryRequestDisplayPin,
3052 WPAS_DBUS_IFACE_P2PDEVICE,
3053 "ProvisionDiscoveryRequestDisplayPin")
3054 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
3055 "StaAuthorized")
3056 self.loop.run()
3057 return self
3058
3059 def groupStarted(self, properties):
3060 logger.debug("groupStarted: " + str(properties))
3061 if not self.invited:
3062 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3063 dev1.scan_for_bss(addr0, freq=2412)
3064 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 join")
3065
3066 def groupFinished(self, properties):
3067 logger.debug("groupFinished: " + str(properties))
3068 if self.invited:
3069 self.done = True
3070 self.loop.quit()
3071 else:
3072 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3073 dev1.request("SET persistent_reconnect 1")
3074 dev1.p2p_listen()
3075
3076 args = { 'persistent_group_object': dbus.ObjectPath(path),
3077 'peer': self.peer_path }
3078 try:
3079 pin = p2p.Invite(args)
3080 raise Exception("Invalid Invite accepted")
3081 except dbus.exceptions.DBusException, e:
3082 if "InvalidArgs" not in str(e):
3083 raise Exception("Unexpected error message for invalid Invite: " + str(e))
3084
3085 args = { 'persistent_group_object': self.persistent,
3086 'peer': self.peer_path }
3087 pin = p2p.Invite(args)
3088 self.invited = True
3089
3090 def persistentGroupAdded(self, path, properties):
3091 logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
3092 self.persistent = path
3093
3094 def deviceFound(self, path):
3095 logger.debug("deviceFound: path=%s" % path)
3096 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3097 self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3098 dbus_interface=dbus.PROPERTIES_IFACE,
3099 byte_arrays=True)
3100
3101 def provisionDiscoveryRequestDisplayPin(self, peer_object, pin):
3102 logger.debug("provisionDiscoveryRequestDisplayPin - peer=%s pin=%s" % (peer_object, pin))
3103 self.peer_path = peer_object
3104 peer = binascii.unhexlify(peer_object.split('/')[-1])
3105 addr = ""
3106 for p in peer:
3107 if len(addr) > 0:
3108 addr += ':'
3109 addr += '%02x' % ord(p)
3110 params = { 'Role': 'registrar',
3111 'P2PDeviceAddress': self.peer['DeviceAddress'],
3112 'Bssid': self.peer['DeviceAddress'],
3113 'Type': 'pin',
3114 'Pin': '12345670' }
3115 logger.info("Authorize peer to connect to the group")
3116 wps.Start(params)
3117
3118 def staAuthorized(self, name):
3119 logger.debug("staAuthorized: " + name)
3120 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3121 dev1.remove_group()
3122 ev = dev1.wait_event(["P2P-GROUP-REMOVED"], timeout=10)
3123 if ev is None:
3124 raise Exception("Group removal timed out")
3125 p2p.Disconnect()
3126
3127 def run_test(self, *args):
3128 logger.debug("run_test")
3129 params = dbus.Dictionary({'persistent': True,
3130 'frequency': 2412})
3131 logger.info("Add a persistent group")
3132 p2p.GroupAdd(params)
3133 return False
3134
3135 def success(self):
3136 return self.done
3137
3138 with TestDbusP2p(bus) as t:
3139 if not t.success():
3140 raise Exception("Expected signals not seen")
3141
3142def test_dbus_p2p_go_neg_rx(dev, apdev):
3143 """D-Bus P2P GO Negotiation receive"""
3144 (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3145 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3146 addr0 = dev[0].p2p_dev_addr()
3147
3148 class TestDbusP2p(TestDbus):
3149 def __init__(self, bus):
3150 TestDbus.__init__(self, bus)
3151 self.done = False
3152
3153 def __enter__(self):
3154 gobject.timeout_add(1, self.run_test)
3155 gobject.timeout_add(15000, self.timeout)
3156 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3157 "DeviceFound")
3158 self.add_signal(self.goNegotiationRequest,
3159 WPAS_DBUS_IFACE_P2PDEVICE,
3160 "GONegotiationRequest",
3161 byte_arrays=True)
3162 self.add_signal(self.goNegotiationSuccess,
3163 WPAS_DBUS_IFACE_P2PDEVICE,
3164 "GONegotiationSuccess",
3165 byte_arrays=True)
3166 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3167 "GroupStarted")
3168 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3169 "GroupFinished")
3170 self.loop.run()
3171 return self
3172
3173 def deviceFound(self, path):
3174 logger.debug("deviceFound: path=%s" % path)
3175
3176 def goNegotiationRequest(self, path, dev_passwd_id):
3177 logger.debug("goNegotiationRequest: path=%s dev_passwd_id=%d" % (path, dev_passwd_id))
3178 if dev_passwd_id != 1:
3179 raise Exception("Unexpected dev_passwd_id=%d" % dev_passwd_id)
3180 args = { 'peer': path, 'wps_method': 'display', 'pin': '12345670',
3181 'go_intent': 15, 'persistent': False, 'frequency': 5175 }
3182 try:
3183 p2p.Connect(args)
3184 raise Exception("Invalid Connect accepted")
3185 except dbus.exceptions.DBusException, e:
3186 if "ConnectChannelUnsupported" not in str(e):
3187 raise Exception("Unexpected error message for invalid Connect: " + str(e))
3188
3189 args = { 'peer': path, 'wps_method': 'display', 'pin': '12345670',
3190 'go_intent': 15, 'persistent': False }
3191 p2p.Connect(args)
3192
3193 def goNegotiationSuccess(self, properties):
3194 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
3195
3196 def groupStarted(self, properties):
3197 logger.debug("groupStarted: " + str(properties))
3198 p2p.Disconnect()
3199
3200 def groupFinished(self, properties):
3201 logger.debug("groupFinished: " + str(properties))
3202 self.done = True
3203 self.loop.quit()
3204
3205 def run_test(self, *args):
3206 logger.debug("run_test")
3207 p2p.Listen(10)
3208 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3209 if not dev1.discover_peer(addr0):
3210 raise Exception("Peer not found")
3211 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 enter")
3212 return False
3213
3214 def success(self):
3215 return self.done
3216
3217 with TestDbusP2p(bus) as t:
3218 if not t.success():
3219 raise Exception("Expected signals not seen")
3220
3221def test_dbus_p2p_go_neg_auth(dev, apdev):
3222 """D-Bus P2P GO Negotiation authorized"""
3223 (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3224 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3225 addr0 = dev[0].p2p_dev_addr()
3226 dev[1].p2p_listen()
3227
3228 class TestDbusP2p(TestDbus):
3229 def __init__(self, bus):
3230 TestDbus.__init__(self, bus)
3231 self.done = False
3232 self.peer_joined = False
3233 self.peer_disconnected = False
3234
3235 def __enter__(self):
3236 gobject.timeout_add(1, self.run_test)
3237 gobject.timeout_add(15000, self.timeout)
3238 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3239 "DeviceFound")
3240 self.add_signal(self.goNegotiationSuccess,
3241 WPAS_DBUS_IFACE_P2PDEVICE,
3242 "GONegotiationSuccess",
3243 byte_arrays=True)
3244 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3245 "GroupStarted")
3246 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3247 "GroupFinished")
3248 self.add_signal(self.staDeauthorized, WPAS_DBUS_IFACE,
3249 "StaDeauthorized")
3250 self.add_signal(self.peerJoined, WPAS_DBUS_GROUP,
3251 "PeerJoined")
3252 self.add_signal(self.peerDisconnected, WPAS_DBUS_GROUP,
3253 "PeerDisconnected")
3254 self.loop.run()
3255 return self
3256
3257 def deviceFound(self, path):
3258 logger.debug("deviceFound: path=%s" % path)
3259 args = { 'peer': path, 'wps_method': 'keypad',
3260 'go_intent': 15, 'authorize_only': True }
3261 try:
3262 p2p.Connect(args)
3263 raise Exception("Invalid Connect accepted")
3264 except dbus.exceptions.DBusException, e:
3265 if "InvalidArgs" not in str(e):
3266 raise Exception("Unexpected error message for invalid Connect: " + str(e))
3267
3268 args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
3269 'go_intent': 15, 'authorize_only': True }
3270 p2p.Connect(args)
3271 p2p.Listen(10)
3272 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3273 if not dev1.discover_peer(addr0):
3274 raise Exception("Peer not found")
3275 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=0")
3276 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15);
3277 if ev is None:
3278 raise Exception("Group formation timed out")
3279
3280 def goNegotiationSuccess(self, properties):
3281 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
3282
3283 def groupStarted(self, properties):
3284 logger.debug("groupStarted: " + str(properties))
3285 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3286 dev1.remove_group()
3287
3288 def staDeauthorized(self, name):
3289 logger.debug("staDeuthorized: " + name)
3290 p2p.Disconnect()
3291
3292 def peerJoined(self, peer):
3293 logger.debug("peerJoined: " + peer)
3294 self.peer_joined = True
3295
3296 def peerDisconnected(self, peer):
3297 logger.debug("peerDisconnected: " + peer)
3298 self.peer_disconnected = True
3299
3300 def groupFinished(self, properties):
3301 logger.debug("groupFinished: " + str(properties))
3302 self.done = True
3303 self.loop.quit()
3304
3305 def run_test(self, *args):
3306 logger.debug("run_test")
3307 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
3308 return False
3309
3310 def success(self):
3311 return self.done and self.peer_joined and self.peer_disconnected
3312
3313 with TestDbusP2p(bus) as t:
3314 if not t.success():
3315 raise Exception("Expected signals not seen")
3316
3317def test_dbus_p2p_go_neg_init(dev, apdev):
3318 """D-Bus P2P GO Negotiation initiation"""
3319 (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3320 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3321 addr0 = dev[0].p2p_dev_addr()
3322 dev[1].p2p_listen()
3323
3324 class TestDbusP2p(TestDbus):
3325 def __init__(self, bus):
3326 TestDbus.__init__(self, bus)
3327 self.done = False
3328
3329 def __enter__(self):
3330 gobject.timeout_add(1, self.run_test)
3331 gobject.timeout_add(15000, self.timeout)
3332 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3333 "DeviceFound")
3334 self.add_signal(self.goNegotiationSuccess,
3335 WPAS_DBUS_IFACE_P2PDEVICE,
3336 "GONegotiationSuccess",
3337 byte_arrays=True)
3338 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3339 "GroupStarted")
3340 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3341 "GroupFinished")
3342 self.loop.run()
3343 return self
3344
3345 def deviceFound(self, path):
3346 logger.debug("deviceFound: path=%s" % path)
3347 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3348 args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
3349 'go_intent': 0 }
3350 p2p.Connect(args)
3351
3352 ev = dev1.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15)
3353 if ev is None:
3354 raise Exception("Timeout while waiting for GO Neg Request")
3355 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15")
3356 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15);
3357 if ev is None:
3358 raise Exception("Group formation timed out")
3359
3360 def goNegotiationSuccess(self, properties):
3361 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
3362
3363 def groupStarted(self, properties):
3364 logger.debug("groupStarted: " + str(properties))
3365 p2p.Disconnect()
3366 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3367 dev1.remove_group()
3368
3369 def groupFinished(self, properties):
3370 logger.debug("groupFinished: " + str(properties))
3371 self.done = True
3372 self.loop.quit()
3373
3374 def run_test(self, *args):
3375 logger.debug("run_test")
3376 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
3377 return False
3378
3379 def success(self):
3380 return self.done
3381
3382 with TestDbusP2p(bus) as t:
3383 if not t.success():
3384 raise Exception("Expected signals not seen")
3385
3386def test_dbus_p2p_wps_failure(dev, apdev):
3387 """D-Bus P2P WPS failure"""
3388 (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3389 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3390 addr0 = dev[0].p2p_dev_addr()
3391
3392 class TestDbusP2p(TestDbus):
3393 def __init__(self, bus):
3394 TestDbus.__init__(self, bus)
3395 self.done = False
3396
3397 def __enter__(self):
3398 gobject.timeout_add(1, self.run_test)
3399 gobject.timeout_add(15000, self.timeout)
3400 self.add_signal(self.goNegotiationRequest,
3401 WPAS_DBUS_IFACE_P2PDEVICE,
3402 "GONegotiationRequest",
3403 byte_arrays=True)
3404 self.add_signal(self.goNegotiationSuccess,
3405 WPAS_DBUS_IFACE_P2PDEVICE,
3406 "GONegotiationSuccess",
3407 byte_arrays=True)
3408 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3409 "GroupStarted")
3410 self.add_signal(self.wpsFailed, WPAS_DBUS_IFACE_P2PDEVICE,
3411 "WpsFailed")
3412 self.loop.run()
3413 return self
3414
3415 def goNegotiationRequest(self, path, dev_passwd_id):
3416 logger.debug("goNegotiationRequest: path=%s dev_passwd_id=%d" % (path, dev_passwd_id))
3417 if dev_passwd_id != 1:
3418 raise Exception("Unexpected dev_passwd_id=%d" % dev_passwd_id)
3419 args = { 'peer': path, 'wps_method': 'display', 'pin': '12345670',
3420 'go_intent': 15 }
3421 p2p.Connect(args)
3422
3423 def goNegotiationSuccess(self, properties):
3424 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
3425
3426 def groupStarted(self, properties):
3427 logger.debug("groupStarted: " + str(properties))
3428 raise Exception("Unexpected GroupStarted")
3429
3430 def wpsFailed(self, name, args):
3431 logger.debug("wpsFailed - name=%s args=%s" % (name, str(args)))
3432 self.done = True
3433 self.loop.quit()
3434
3435 def run_test(self, *args):
3436 logger.debug("run_test")
3437 p2p.Listen(10)
3438 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3439 if not dev1.discover_peer(addr0):
3440 raise Exception("Peer not found")
3441 dev1.global_request("P2P_CONNECT " + addr0 + " 87654321 enter")
3442 return False
3443
3444 def success(self):
3445 return self.done
3446
3447 with TestDbusP2p(bus) as t:
3448 if not t.success():
3449 raise Exception("Expected signals not seen")
3450
3451def test_dbus_p2p_two_groups(dev, apdev):
3452 """D-Bus P2P with two concurrent groups"""
3453 (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3454 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3455
3456 dev[0].request("SET p2p_no_group_iface 0")
3457 addr0 = dev[0].p2p_dev_addr()
3458 addr1 = dev[1].p2p_dev_addr()
3459 addr2 = dev[2].p2p_dev_addr()
3460 dev[1].p2p_start_go(freq=2412)
3461
3462 class TestDbusP2p(TestDbus):
3463 def __init__(self, bus):
3464 TestDbus.__init__(self, bus)
3465 self.done = False
3466 self.peer = None
3467 self.go = None
3468 self.group1 = None
3469 self.group2 = None
3470
3471 def __enter__(self):
3472 gobject.timeout_add(1, self.run_test)
3473 gobject.timeout_add(15000, self.timeout)
3474 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
3475 "PropertiesChanged", byte_arrays=True)
3476 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3477 "DeviceFound")
3478 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3479 "GroupStarted")
3480 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3481 "GroupFinished")
3482 self.add_signal(self.peerJoined, WPAS_DBUS_GROUP,
3483 "PeerJoined")
3484 self.loop.run()
3485 return self
3486
3487 def propertiesChanged(self, interface_name, changed_properties,
3488 invalidated_properties):
3489 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
3490
3491 def deviceFound(self, path):
3492 logger.debug("deviceFound: path=%s" % path)
3493 if addr2.replace(':','') in path:
3494 self.peer = path
3495 elif addr1.replace(':','') in path:
3496 self.go = path
3497 if self.go and not self.group1:
3498 logger.info("Join the group")
3499 p2p.StopFind()
3500 pin = '12345670'
3501 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3502 dev1.request("WPS_PIN any " + pin)
3503 args = { 'peer': self.go,
3504 'join': True,
3505 'wps_method': 'pin',
3506 'pin': pin,
3507 'frequency': 2412 }
3508 p2p.Connect(args)
3509
3510 def groupStarted(self, properties):
3511 logger.debug("groupStarted: " + str(properties))
3512 prop = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE,
3513 dbus_interface=dbus.PROPERTIES_IFACE)
3514 logger.debug("p2pdevice properties: " + str(prop))
3515
3516 g_obj = bus.get_object(WPAS_DBUS_SERVICE,
3517 properties['group_object'])
3518 res = g_obj.GetAll(WPAS_DBUS_GROUP,
3519 dbus_interface=dbus.PROPERTIES_IFACE,
3520 byte_arrays=True)
3521 logger.debug("Group properties: " + str(res))
3522
3523 if not self.group1:
3524 self.group1 = properties['group_object']
3525 self.group1iface = properties['interface_object']
3526 self.g1_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3527 self.group1iface)
3528
3529 logger.info("Start autonomous GO")
3530 params = dbus.Dictionary({ 'frequency': 2412 })
3531 p2p.GroupAdd(params)
3532 elif not self.group2:
3533 self.group2 = properties['group_object']
3534 self.group2iface = properties['interface_object']
3535 self.g2_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3536 self.group2iface)
3537 self.g2_bssid = res['BSSID']
3538
3539 if self.group1 and self.group2:
3540 logger.info("Authorize peer to join the group")
3541 a2 = binascii.unhexlify(addr2.replace(':',''))
3542 params = { 'Role': 'enrollee',
3543 'P2PDeviceAddress': dbus.ByteArray(a2),
3544 'Bssid': dbus.ByteArray(a2),
3545 'Type': 'pin',
3546 'Pin': '12345670' }
3547 g_wps = dbus.Interface(self.g2_if_obj, WPAS_DBUS_IFACE_WPS)
3548 g_wps.Start(params)
3549
3550 bssid = ':'.join([binascii.hexlify(l) for l in self.g2_bssid])
3551 dev2 = WpaSupplicant('wlan2', '/tmp/wpas-wlan2')
3552 dev2.scan_for_bss(bssid, freq=2412)
3553 dev2.global_request("P2P_CONNECT " + bssid + " 12345670 join freq=2412")
3554
3555 def groupFinished(self, properties):
3556 logger.debug("groupFinished: " + str(properties))
3557
3558 if self.group1 == properties['group_object']:
3559 self.group1 = None
3560 elif self.group2 == properties['group_object']:
3561 self.group2 = None
3562
3563 if not self.group1 and not self.group2:
3564 self.done = True
3565 self.loop.quit()
3566
3567 def peerJoined(self, peer):
3568 logger.debug("peerJoined: " + peer)
3569 self.check_results()
3570
3571 dev2 = WpaSupplicant('wlan2', '/tmp/wpas-wlan2')
3572 dev2.remove_group()
3573
3574 logger.info("Disconnect group2")
3575 group_p2p = dbus.Interface(self.g2_if_obj,
3576 WPAS_DBUS_IFACE_P2PDEVICE)
3577 group_p2p.Disconnect()
3578
3579 logger.info("Disconnect group1")
3580 group_p2p = dbus.Interface(self.g1_if_obj,
3581 WPAS_DBUS_IFACE_P2PDEVICE)
3582 group_p2p.Disconnect()
3583
3584 def check_results(self):
3585 logger.info("Check results with two concurrent groups in operation")
3586
3587 g1_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group1)
3588 res1 = g1_obj.GetAll(WPAS_DBUS_GROUP,
3589 dbus_interface=dbus.PROPERTIES_IFACE,
3590 byte_arrays=True)
3591
3592 g2_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group2)
3593 res2 = g2_obj.GetAll(WPAS_DBUS_GROUP,
3594 dbus_interface=dbus.PROPERTIES_IFACE,
3595 byte_arrays=True)
3596
3597 logger.info("group1 = " + self.group1)
3598 logger.debug("Group properties: " + str(res1))
3599
3600 logger.info("group2 = " + self.group2)
3601 logger.debug("Group properties: " + str(res2))
3602
3603 prop = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE,
3604 dbus_interface=dbus.PROPERTIES_IFACE)
3605 logger.debug("p2pdevice properties: " + str(prop))
3606
3607 if res1['Role'] != 'client':
3608 raise Exception("Group1 role reported incorrectly: " + res1['Role'])
3609 if res2['Role'] != 'GO':
3610 raise Exception("Group2 role reported incorrectly: " + res2['Role'])
3611 if prop['Role'] != 'device':
3612 raise Exception("p2pdevice role reported incorrectly: " + prop['Role'])
3613
3614 if len(res2['Members']) != 1:
3615 raise Exception("Unexpected Members value for group 2")
3616
3617 def run_test(self, *args):
3618 logger.debug("run_test")
3619 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
3620 return False
3621
3622 def success(self):
3623 return self.done
3624
3625 with TestDbusP2p(bus) as t:
3626 if not t.success():
3627 raise Exception("Expected signals not seen")
3628
3629 dev[1].remove_group()