1 # wpa_supplicant D-Bus interface tests
2 # Copyright (c) 2014-2015, Jouni Malinen <j@w1.fi>
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
9 logger
= logging
.getLogger()
21 from wpasupplicant
import WpaSupplicant
22 from utils
import HwsimSkip
, alloc_fail
, fail_test
23 from p2p_utils
import *
24 from test_ap_tdls
import connect_2sta_open
25 from test_ap_eap
import check_altsubject_match_support
26 from test_nfc_p2p
import set_ip_addr_info
27 from test_wpas_mesh
import check_mesh_support
, add_open_mesh_network
29 WPAS_DBUS_SERVICE
= "fi.w1.wpa_supplicant1"
30 WPAS_DBUS_PATH
= "/fi/w1/wpa_supplicant1"
31 WPAS_DBUS_IFACE
= "fi.w1.wpa_supplicant1.Interface"
32 WPAS_DBUS_IFACE_WPS
= WPAS_DBUS_IFACE
+ ".WPS"
33 WPAS_DBUS_NETWORK
= "fi.w1.wpa_supplicant1.Network"
34 WPAS_DBUS_BSS
= "fi.w1.wpa_supplicant1.BSS"
35 WPAS_DBUS_IFACE_P2PDEVICE
= WPAS_DBUS_IFACE
+ ".P2PDevice"
36 WPAS_DBUS_P2P_PEER
= "fi.w1.wpa_supplicant1.Peer"
37 WPAS_DBUS_GROUP
= "fi.w1.wpa_supplicant1.Group"
38 WPAS_DBUS_PERSISTENT_GROUP
= "fi.w1.wpa_supplicant1.PersistentGroup"
39 WPAS_DBUS_IFACE_MESH
= WPAS_DBUS_IFACE
+ ".Mesh"
41 def prepare_dbus(dev
):
43 logger
.info("No dbus module available")
44 raise HwsimSkip("No dbus module available")
46 from dbus
.mainloop
.glib
import DBusGMainLoop
47 dbus
.mainloop
.glib
.DBusGMainLoop(set_as_default
=True)
48 bus
= dbus
.SystemBus()
49 wpas_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, WPAS_DBUS_PATH
)
50 wpas
= dbus
.Interface(wpas_obj
, WPAS_DBUS_SERVICE
)
51 path
= wpas
.GetInterface(dev
.ifname
)
52 if_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, path
)
53 return (bus
,wpas_obj
,path
,if_obj
)
55 raise HwsimSkip("Could not connect to D-Bus: %s" % e
)
57 class TestDbus(object):
58 def __init__(self
, bus
):
59 self
.loop
= gobject
.MainLoop()
63 def __exit__(self
, type, value
, traceback
):
64 for s
in self
.signals
:
67 def add_signal(self
, handler
, interface
, name
, byte_arrays
=False):
68 s
= self
.bus
.add_signal_receiver(handler
, dbus_interface
=interface
,
70 byte_arrays
=byte_arrays
)
71 self
.signals
.append(s
)
73 def timeout(self
, *args
):
74 logger
.debug("timeout")
78 class alloc_fail_dbus(object):
79 def __init__(self
, dev
, count
, funcs
, operation
="Operation",
84 self
._operation
= operation
85 self
._expected
= expected
87 cmd
= "TEST_ALLOC_FAIL %d:%s" % (self
._count
, self
._funcs
)
88 if "OK" not in self
._dev
.request(cmd
):
89 raise HwsimSkip("TEST_ALLOC_FAIL not supported")
90 def __exit__(self
, type, value
, traceback
):
92 raise Exception("%s succeeded during out-of-memory" % self
._operation
)
93 if type == dbus
.exceptions
.DBusException
and self
._expected
in str(value
):
95 if self
._dev
.request("GET_ALLOC_FAIL") != "0:%s" % self
._funcs
:
96 raise Exception("%s did not trigger allocation failure" % self
._operation
)
99 def start_ap(ap
, ssid
="test-wps",
100 ap_uuid
="27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"):
101 params
= { "ssid": ssid
, "eap_server": "1", "wps_state": "2",
102 "wpa_passphrase": "12345678", "wpa": "2",
103 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
104 "ap_pin": "12345670", "uuid": ap_uuid
}
105 return hostapd
.add_ap(ap
, params
)
107 def test_dbus_getall(dev
, apdev
):
109 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
111 props
= wpas_obj
.GetAll(WPAS_DBUS_SERVICE
,
112 dbus_interface
=dbus
.PROPERTIES_IFACE
)
113 logger
.debug("GetAll(fi.w1.wpa.supplicant1, /fi/w1/wpa_supplicant1) ==> " + str(props
))
115 props
= if_obj
.GetAll(WPAS_DBUS_IFACE
,
116 dbus_interface
=dbus
.PROPERTIES_IFACE
)
117 logger
.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_IFACE
, path
, str(props
)))
119 props
= if_obj
.GetAll(WPAS_DBUS_IFACE_WPS
,
120 dbus_interface
=dbus
.PROPERTIES_IFACE
)
121 logger
.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_IFACE_WPS
, path
, str(props
)))
123 res
= if_obj
.Get(WPAS_DBUS_IFACE
, 'BSSs',
124 dbus_interface
=dbus
.PROPERTIES_IFACE
)
126 raise Exception("Unexpected BSSs entry: " + str(res
))
128 res
= if_obj
.Get(WPAS_DBUS_IFACE
, 'Networks',
129 dbus_interface
=dbus
.PROPERTIES_IFACE
)
131 raise Exception("Unexpected Networks entry: " + str(res
))
133 hapd
= hostapd
.add_ap(apdev
[0], { "ssid": "open" })
134 bssid
= apdev
[0]['bssid']
135 dev
[0].scan_for_bss(bssid
, freq
=2412)
136 id = dev
[0].add_network()
137 dev
[0].set_network(id, "disabled", "0")
138 dev
[0].set_network_quoted(id, "ssid", "test")
140 res
= if_obj
.Get(WPAS_DBUS_IFACE
, 'BSSs',
141 dbus_interface
=dbus
.PROPERTIES_IFACE
)
143 raise Exception("Missing BSSs entry: " + str(res
))
144 bss_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, res
[0])
145 props
= bss_obj
.GetAll(WPAS_DBUS_BSS
, dbus_interface
=dbus
.PROPERTIES_IFACE
)
146 logger
.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_BSS
, res
[0], str(props
)))
148 for item
in props
['BSSID']:
149 if len(bssid_str
) > 0:
151 bssid_str
+= '%02x' % item
152 if bssid_str
!= bssid
:
153 raise Exception("Unexpected BSSID in BSSs entry")
155 res
= if_obj
.Get(WPAS_DBUS_IFACE
, 'Networks',
156 dbus_interface
=dbus
.PROPERTIES_IFACE
)
158 raise Exception("Missing Networks entry: " + str(res
))
159 net_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, res
[0])
160 props
= net_obj
.GetAll(WPAS_DBUS_NETWORK
,
161 dbus_interface
=dbus
.PROPERTIES_IFACE
)
162 logger
.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_NETWORK
, res
[0], str(props
)))
163 ssid
= props
['Properties']['ssid']
165 raise Exception("Unexpected SSID in network entry")
167 def test_dbus_getall_oom(dev
, apdev
):
168 """D-Bus GetAll wpa_config_get_all() OOM"""
169 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
171 id = dev
[0].add_network()
172 dev
[0].set_network(id, "disabled", "0")
173 dev
[0].set_network_quoted(id, "ssid", "test")
175 res
= if_obj
.Get(WPAS_DBUS_IFACE
, 'Networks',
176 dbus_interface
=dbus
.PROPERTIES_IFACE
)
178 raise Exception("Missing Networks entry: " + str(res
))
179 net_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, res
[0])
180 for i
in range(1, 50):
181 with
alloc_fail(dev
[0], i
, "wpa_config_get_all"):
183 props
= net_obj
.GetAll(WPAS_DBUS_NETWORK
,
184 dbus_interface
=dbus
.PROPERTIES_IFACE
)
185 except dbus
.exceptions
.DBusException
, e
:
188 def dbus_get(dbus
, wpas_obj
, prop
, expect
=None, byte_arrays
=False):
189 val
= wpas_obj
.Get(WPAS_DBUS_SERVICE
, prop
,
190 dbus_interface
=dbus
.PROPERTIES_IFACE
,
191 byte_arrays
=byte_arrays
)
192 if expect
is not None and val
!= expect
:
193 raise Exception("Unexpected %s: %s (expected: %s)" %
194 (prop
, str(val
), str(expect
)))
197 def dbus_set(dbus
, wpas_obj
, prop
, val
):
198 wpas_obj
.Set(WPAS_DBUS_SERVICE
, prop
, val
,
199 dbus_interface
=dbus
.PROPERTIES_IFACE
)
201 def test_dbus_properties(dev
, apdev
):
202 """D-Bus Get/Set fi.w1.wpa_supplicant1 properties"""
203 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
205 dbus_get(dbus
, wpas_obj
, "DebugLevel", expect
="msgdump")
206 dbus_set(dbus
, wpas_obj
, "DebugLevel", "debug")
207 dbus_get(dbus
, wpas_obj
, "DebugLevel", expect
="debug")
208 for (val
,err
) in [ (3, "Error.Failed: wrong property type"),
209 ("foo", "Error.Failed: wrong debug level value") ]:
211 dbus_set(dbus
, wpas_obj
, "DebugLevel", val
)
212 raise Exception("Invalid DebugLevel value accepted: " + str(val
))
213 except dbus
.exceptions
.DBusException
, e
:
214 if err
not in str(e
):
215 raise Exception("Unexpected error message: " + str(e
))
216 dbus_set(dbus
, wpas_obj
, "DebugLevel", "msgdump")
217 dbus_get(dbus
, wpas_obj
, "DebugLevel", expect
="msgdump")
219 dbus_get(dbus
, wpas_obj
, "DebugTimestamp", expect
=True)
220 dbus_set(dbus
, wpas_obj
, "DebugTimestamp", False)
221 dbus_get(dbus
, wpas_obj
, "DebugTimestamp", expect
=False)
223 dbus_set(dbus
, wpas_obj
, "DebugTimestamp", "foo")
224 raise Exception("Invalid DebugTimestamp value accepted")
225 except dbus
.exceptions
.DBusException
, e
:
226 if "Error.Failed: wrong property type" not in str(e
):
227 raise Exception("Unexpected error message: " + str(e
))
228 dbus_set(dbus
, wpas_obj
, "DebugTimestamp", True)
229 dbus_get(dbus
, wpas_obj
, "DebugTimestamp", expect
=True)
231 dbus_get(dbus
, wpas_obj
, "DebugShowKeys", expect
=True)
232 dbus_set(dbus
, wpas_obj
, "DebugShowKeys", False)
233 dbus_get(dbus
, wpas_obj
, "DebugShowKeys", expect
=False)
235 dbus_set(dbus
, wpas_obj
, "DebugShowKeys", "foo")
236 raise Exception("Invalid DebugShowKeys value accepted")
237 except dbus
.exceptions
.DBusException
, e
:
238 if "Error.Failed: wrong property type" not in str(e
):
239 raise Exception("Unexpected error message: " + str(e
))
240 dbus_set(dbus
, wpas_obj
, "DebugShowKeys", True)
241 dbus_get(dbus
, wpas_obj
, "DebugShowKeys", expect
=True)
243 res
= dbus_get(dbus
, wpas_obj
, "Interfaces")
245 raise Exception("Unexpected Interfaces value: " + str(res
))
247 res
= dbus_get(dbus
, wpas_obj
, "EapMethods")
248 if len(res
) < 5 or "TTLS" not in res
:
249 raise Exception("Unexpected EapMethods value: " + str(res
))
251 res
= dbus_get(dbus
, wpas_obj
, "Capabilities")
252 if len(res
) < 2 or "p2p" not in res
:
253 raise Exception("Unexpected Capabilities value: " + str(res
))
255 dbus_get(dbus
, wpas_obj
, "WFDIEs", byte_arrays
=True)
256 val
= binascii
.unhexlify("010006020304050608")
257 dbus_set(dbus
, wpas_obj
, "WFDIEs", dbus
.ByteArray(val
))
258 res
= dbus_get(dbus
, wpas_obj
, "WFDIEs", byte_arrays
=True)
260 raise Exception("WFDIEs value changed")
262 dbus_set(dbus
, wpas_obj
, "WFDIEs", dbus
.ByteArray('\x00'))
263 raise Exception("Invalid WFDIEs value accepted")
264 except dbus
.exceptions
.DBusException
, e
:
265 if "InvalidArgs" not in str(e
):
266 raise Exception("Unexpected error message: " + str(e
))
267 dbus_set(dbus
, wpas_obj
, "WFDIEs", dbus
.ByteArray(''))
268 dbus_set(dbus
, wpas_obj
, "WFDIEs", dbus
.ByteArray(val
))
269 dbus_set(dbus
, wpas_obj
, "WFDIEs", dbus
.ByteArray(''))
270 res
= dbus_get(dbus
, wpas_obj
, "WFDIEs", byte_arrays
=True)
272 raise Exception("WFDIEs not cleared properly")
274 res
= dbus_get(dbus
, wpas_obj
, "EapMethods")
276 dbus_set(dbus
, wpas_obj
, "EapMethods", res
)
277 raise Exception("Invalid Set accepted")
278 except dbus
.exceptions
.DBusException
, e
:
279 if "InvalidArgs: Property is read-only" not in str(e
):
280 raise Exception("Unexpected error message: " + str(e
))
283 wpas_obj
.SetFoo(WPAS_DBUS_SERVICE
, "DebugShowKeys", True,
284 dbus_interface
=dbus
.PROPERTIES_IFACE
)
285 raise Exception("Unknown method accepted")
286 except dbus
.exceptions
.DBusException
, e
:
287 if "UnknownMethod" not in str(e
):
288 raise Exception("Unexpected error message: " + str(e
))
291 wpas_obj
.Get("foo", "DebugShowKeys",
292 dbus_interface
=dbus
.PROPERTIES_IFACE
)
293 raise Exception("Invalid Get accepted")
294 except dbus
.exceptions
.DBusException
, e
:
295 if "InvalidArgs: No such property" not in str(e
):
296 raise Exception("Unexpected error message: " + str(e
))
298 test_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, WPAS_DBUS_PATH
,
301 test_obj
.Get(123, "DebugShowKeys",
302 dbus_interface
=dbus
.PROPERTIES_IFACE
)
303 raise Exception("Invalid Get accepted")
304 except dbus
.exceptions
.DBusException
, e
:
305 if "InvalidArgs: Invalid arguments" not in str(e
):
306 raise Exception("Unexpected error message: " + str(e
))
308 test_obj
.Get(WPAS_DBUS_SERVICE
, 123,
309 dbus_interface
=dbus
.PROPERTIES_IFACE
)
310 raise Exception("Invalid Get accepted")
311 except dbus
.exceptions
.DBusException
, e
:
312 if "InvalidArgs: Invalid arguments" not in str(e
):
313 raise Exception("Unexpected error message: " + str(e
))
316 wpas_obj
.Set(WPAS_DBUS_SERVICE
, "WFDIEs",
317 dbus
.ByteArray('', variant_level
=2),
318 dbus_interface
=dbus
.PROPERTIES_IFACE
)
319 raise Exception("Invalid Set accepted")
320 except dbus
.exceptions
.DBusException
, e
:
321 if "InvalidArgs: invalid message format" not in str(e
):
322 raise Exception("Unexpected error message: " + str(e
))
324 def test_dbus_set_global_properties(dev
, apdev
):
325 """D-Bus Get/Set fi.w1.wpa_supplicant1 interface global properties"""
326 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
328 dev
[0].set("model_name", "")
329 props
= [ ('Okc', '0', '1'), ('ModelName', '', 'blahblahblah') ]
332 res
= if_obj
.Get(WPAS_DBUS_IFACE
, p
[0],
333 dbus_interface
=dbus
.PROPERTIES_IFACE
)
335 raise Exception("Unexpected " + p
[0] + " value: " + str(res
))
337 if_obj
.Set(WPAS_DBUS_IFACE
, p
[0], p
[2],
338 dbus_interface
=dbus
.PROPERTIES_IFACE
)
340 res
= if_obj
.Get(WPAS_DBUS_IFACE
, p
[0],
341 dbus_interface
=dbus
.PROPERTIES_IFACE
)
343 raise Exception("Unexpected " + p
[0] + " value after set: " + str(res
))
344 dev
[0].set("model_name", "")
346 def test_dbus_invalid_method(dev
, apdev
):
347 """D-Bus invalid method"""
348 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
349 wps
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_WPS
)
353 raise Exception("Unknown method accepted")
354 except dbus
.exceptions
.DBusException
, e
:
355 if "UnknownMethod" not in str(e
):
356 raise Exception("Unexpected error message: " + str(e
))
358 test_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, path
, introspect
=False)
359 test_wps
= dbus
.Interface(test_obj
, WPAS_DBUS_IFACE_WPS
)
362 raise Exception("WPS.Start with incorrect signature accepted")
363 except dbus
.exceptions
.DBusException
, e
:
364 if "InvalidArgs: Invalid arg" not in str(e
):
365 raise Exception("Unexpected error message: " + str(e
))
367 def test_dbus_get_set_wps(dev
, apdev
):
368 """D-Bus Get/Set for WPS properties"""
370 _test_dbus_get_set_wps(dev
, apdev
)
372 dev
[0].request("SET wps_cred_processing 0")
373 dev
[0].request("SET config_methods display keypad virtual_display nfc_interface p2ps")
374 dev
[0].set("device_name", "Device A")
375 dev
[0].set("manufacturer", "")
376 dev
[0].set("model_name", "")
377 dev
[0].set("model_number", "")
378 dev
[0].set("serial_number", "")
379 dev
[0].set("device_type", "0-00000000-0")
381 def _test_dbus_get_set_wps(dev
, apdev
):
382 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
384 if_obj
.Get(WPAS_DBUS_IFACE_WPS
, "ConfigMethods",
385 dbus_interface
=dbus
.PROPERTIES_IFACE
)
387 val
= "display keypad virtual_display nfc_interface"
388 dev
[0].request("SET config_methods " + val
)
390 config
= if_obj
.Get(WPAS_DBUS_IFACE_WPS
, "ConfigMethods",
391 dbus_interface
=dbus
.PROPERTIES_IFACE
)
393 raise Exception("Unexpected Get(ConfigMethods) result: " + config
)
395 val2
= "push_button display"
396 if_obj
.Set(WPAS_DBUS_IFACE_WPS
, "ConfigMethods", val2
,
397 dbus_interface
=dbus
.PROPERTIES_IFACE
)
398 config
= if_obj
.Get(WPAS_DBUS_IFACE_WPS
, "ConfigMethods",
399 dbus_interface
=dbus
.PROPERTIES_IFACE
)
401 raise Exception("Unexpected Get(ConfigMethods) result after Set: " + config
)
403 dev
[0].request("SET config_methods " + val
)
406 dev
[0].request("SET wps_cred_processing " + str(i
))
407 val
= if_obj
.Get(WPAS_DBUS_IFACE_WPS
, "ProcessCredentials",
408 dbus_interface
=dbus
.PROPERTIES_IFACE
)
409 expected_val
= False if i
== 1 else True
410 if val
!= expected_val
:
411 raise Exception("Unexpected Get(ProcessCredentials) result({}): {}".format(i
, val
))
413 tests
= [ ("device_name", "DeviceName"),
414 ("manufacturer", "Manufacturer"),
415 ("model_name", "ModelName"),
416 ("model_number", "ModelNumber"),
417 ("serial_number", "SerialNumber") ]
420 val2
= "test-value-test"
422 val
= if_obj
.Get(WPAS_DBUS_IFACE_WPS
, f2
,
423 dbus_interface
=dbus
.PROPERTIES_IFACE
)
425 raise Exception("Get(%s) returned unexpected value" % f2
)
427 if_obj
.Set(WPAS_DBUS_IFACE_WPS
, f2
, val2
,
428 dbus_interface
=dbus
.PROPERTIES_IFACE
)
429 val
= if_obj
.Get(WPAS_DBUS_IFACE_WPS
, f2
,
430 dbus_interface
=dbus
.PROPERTIES_IFACE
)
432 raise Exception("Get(%s) returned unexpected value after Set" % f2
)
434 dev
[0].set("device_type", "5-0050F204-1")
435 val
= if_obj
.Get(WPAS_DBUS_IFACE_WPS
, "DeviceType",
436 dbus_interface
=dbus
.PROPERTIES_IFACE
)
437 if val
[0] != 0x00 or val
[1] != 0x05 != val
[2] != 0x00 or val
[3] != 0x50 or val
[4] != 0xf2 or val
[5] != 0x04 or val
[6] != 0x00 or val
[7] != 0x01:
438 raise Exception("DeviceType mismatch")
439 if_obj
.Set(WPAS_DBUS_IFACE_WPS
, "DeviceType", val
,
440 dbus_interface
=dbus
.PROPERTIES_IFACE
)
441 val
= if_obj
.Get(WPAS_DBUS_IFACE_WPS
, "DeviceType",
442 dbus_interface
=dbus
.PROPERTIES_IFACE
)
443 if val
[0] != 0x00 or val
[1] != 0x05 != val
[2] != 0x00 or val
[3] != 0x50 or val
[4] != 0xf2 or val
[5] != 0x04 or val
[6] != 0x00 or val
[7] != 0x01:
444 raise Exception("DeviceType mismatch after Set")
446 val2
= '\x01\x02\x03\x04\x05\x06\x07\x08'
447 if_obj
.Set(WPAS_DBUS_IFACE_WPS
, "DeviceType", dbus
.ByteArray(val2
),
448 dbus_interface
=dbus
.PROPERTIES_IFACE
)
449 val
= if_obj
.Get(WPAS_DBUS_IFACE_WPS
, "DeviceType",
450 dbus_interface
=dbus
.PROPERTIES_IFACE
,
453 raise Exception("DeviceType mismatch after Set (2)")
455 class TestDbusGetSet(TestDbus
):
456 def __init__(self
, bus
):
457 TestDbus
.__init
__(self
, bus
)
458 self
.signal_received
= False
459 self
.signal_received_deprecated
= False
460 self
.sets_done
= False
463 gobject
.timeout_add(1, self
.run_sets
)
464 gobject
.timeout_add(1000, self
.timeout
)
465 self
.add_signal(self
.propertiesChanged
, WPAS_DBUS_IFACE_WPS
,
467 self
.add_signal(self
.propertiesChanged2
, dbus
.PROPERTIES_IFACE
,
472 def propertiesChanged(self
, properties
):
473 logger
.debug("PropertiesChanged: " + str(properties
))
474 if properties
.has_key("ProcessCredentials"):
475 self
.signal_received_deprecated
= True
476 if self
.sets_done
and self
.signal_received
:
479 def propertiesChanged2(self
, interface_name
, changed_properties
,
480 invalidated_properties
):
481 logger
.debug("propertiesChanged2: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name
, str(changed_properties
), str(invalidated_properties
)))
482 if interface_name
!= WPAS_DBUS_IFACE_WPS
:
484 if changed_properties
.has_key("ProcessCredentials"):
485 self
.signal_received
= True
486 if self
.sets_done
and self
.signal_received_deprecated
:
489 def run_sets(self
, *args
):
490 logger
.debug("run_sets")
491 if_obj
.Set(WPAS_DBUS_IFACE_WPS
, "ProcessCredentials",
493 dbus_interface
=dbus
.PROPERTIES_IFACE
)
494 if if_obj
.Get(WPAS_DBUS_IFACE_WPS
, "ProcessCredentials",
495 dbus_interface
=dbus
.PROPERTIES_IFACE
) != True:
496 raise Exception("Unexpected Get(ProcessCredentials) result after Set")
497 if_obj
.Set(WPAS_DBUS_IFACE_WPS
, "ProcessCredentials",
499 dbus_interface
=dbus
.PROPERTIES_IFACE
)
500 if if_obj
.Get(WPAS_DBUS_IFACE_WPS
, "ProcessCredentials",
501 dbus_interface
=dbus
.PROPERTIES_IFACE
) != False:
502 raise Exception("Unexpected Get(ProcessCredentials) result after Set")
504 self
.dbus_sets_done
= True
508 return self
.signal_received
and self
.signal_received_deprecated
510 with
TestDbusGetSet(bus
) as t
:
512 raise Exception("No signal received for ProcessCredentials change")
514 def test_dbus_wps_invalid(dev
, apdev
):
515 """D-Bus invaldi WPS operation"""
516 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
517 wps
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_WPS
)
519 failures
= [ {'Role': 'foo', 'Type': 'pbc'},
520 {'Role': 123, 'Type': 'pbc'},
522 {'Role': 'enrollee'},
523 {'Role': 'registrar'},
524 {'Role': 'enrollee', 'Type': 123},
525 {'Role': 'enrollee', 'Type': 'foo'},
526 {'Role': 'enrollee', 'Type': 'pbc',
527 'Bssid': '02:33:44:55:66:77'},
528 {'Role': 'enrollee', 'Type': 'pin', 'Pin': 123},
529 {'Role': 'enrollee', 'Type': 'pbc',
530 'Bssid': dbus
.ByteArray('12345')},
531 {'Role': 'enrollee', 'Type': 'pbc',
532 'P2PDeviceAddress': 12345},
533 {'Role': 'enrollee', 'Type': 'pbc',
534 'P2PDeviceAddress': dbus
.ByteArray('12345')},
535 {'Role': 'enrollee', 'Type': 'pbc', 'Foo': 'bar'} ]
536 for args
in failures
:
539 raise Exception("Invalid WPS.Start() arguments accepted: " + str(args
))
540 except dbus
.exceptions
.DBusException
, e
:
541 if not str(e
).startswith("fi.w1.wpa_supplicant1.InvalidArgs"):
542 raise Exception("Unexpected error message: " + str(e
))
544 def test_dbus_wps_oom(dev
, apdev
):
545 """D-Bus WPS operation (OOM)"""
546 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
547 wps
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_WPS
)
549 with
alloc_fail_dbus(dev
[0], 1, "=wpas_dbus_getter_state", "Get"):
550 if_obj
.Get(WPAS_DBUS_IFACE
, "State",
551 dbus_interface
=dbus
.PROPERTIES_IFACE
)
553 hapd
= hostapd
.add_ap(apdev
[0], { "ssid": "open" })
554 bssid
= apdev
[0]['bssid']
555 dev
[0].scan_for_bss(bssid
, freq
=2412)
558 for i
in range(1, 3):
559 with
alloc_fail_dbus(dev
[0], i
, "=wpas_dbus_getter_bsss", "Get"):
560 if_obj
.Get(WPAS_DBUS_IFACE
, "BSSs",
561 dbus_interface
=dbus
.PROPERTIES_IFACE
)
563 res
= if_obj
.Get(WPAS_DBUS_IFACE
, 'BSSs',
564 dbus_interface
=dbus
.PROPERTIES_IFACE
)
565 bss_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, res
[0])
566 with
alloc_fail_dbus(dev
[0], 1, "=wpas_dbus_getter_bss_rates", "Get"):
567 bss_obj
.Get(WPAS_DBUS_BSS
, "Rates",
568 dbus_interface
=dbus
.PROPERTIES_IFACE
)
569 with
alloc_fail(dev
[0], 1,
570 "wpa_bss_get_bit_rates;wpas_dbus_getter_bss_rates"):
572 bss_obj
.Get(WPAS_DBUS_BSS
, "Rates",
573 dbus_interface
=dbus
.PROPERTIES_IFACE
)
574 except dbus
.exceptions
.DBusException
, e
:
577 id = dev
[0].add_network()
578 dev
[0].set_network(id, "disabled", "0")
579 dev
[0].set_network_quoted(id, "ssid", "test")
581 for i
in range(1, 3):
582 with
alloc_fail_dbus(dev
[0], i
, "=wpas_dbus_getter_networks", "Get"):
583 if_obj
.Get(WPAS_DBUS_IFACE
, "Networks",
584 dbus_interface
=dbus
.PROPERTIES_IFACE
)
586 with
alloc_fail_dbus(dev
[0], 1, "wpas_dbus_getter_interfaces", "Get"):
587 dbus_get(dbus
, wpas_obj
, "Interfaces")
589 for i
in range(1, 6):
590 with
alloc_fail_dbus(dev
[0], i
, "=eap_get_names_as_string_array;wpas_dbus_getter_eap_methods", "Get"):
591 dbus_get(dbus
, wpas_obj
, "EapMethods")
593 with
alloc_fail_dbus(dev
[0], 1, "wpas_dbus_setter_config_methods", "Set",
594 expected
="Error.Failed: Failed to set property"):
595 val2
= "push_button display"
596 if_obj
.Set(WPAS_DBUS_IFACE_WPS
, "ConfigMethods", val2
,
597 dbus_interface
=dbus
.PROPERTIES_IFACE
)
599 with
alloc_fail_dbus(dev
[0], 1, "=wpa_config_add_network;wpas_dbus_handler_wps_start",
601 expected
="UnknownError: WPS start failed"):
602 wps
.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670'})
604 def test_dbus_wps_pbc(dev
, apdev
):
605 """D-Bus WPS/PBC operation and signals"""
607 _test_dbus_wps_pbc(dev
, apdev
)
609 dev
[0].request("SET wps_cred_processing 0")
611 def _test_dbus_wps_pbc(dev
, apdev
):
612 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
613 wps
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_WPS
)
615 hapd
= start_ap(apdev
[0])
616 hapd
.request("WPS_PBC")
617 bssid
= apdev
[0]['bssid']
618 dev
[0].scan_for_bss(bssid
, freq
="2412")
619 dev
[0].request("SET wps_cred_processing 2")
621 res
= if_obj
.Get(WPAS_DBUS_IFACE
, 'BSSs',
622 dbus_interface
=dbus
.PROPERTIES_IFACE
)
624 raise Exception("Missing BSSs entry: " + str(res
))
625 bss_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, res
[0])
626 props
= bss_obj
.GetAll(WPAS_DBUS_BSS
, dbus_interface
=dbus
.PROPERTIES_IFACE
)
627 logger
.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_BSS
, res
[0], str(props
)))
628 if 'WPS' not in props
:
629 raise Exception("No WPS information in the BSS entry")
630 if 'Type' not in props
['WPS']:
631 raise Exception("No Type field in the WPS dictionary")
632 if props
['WPS']['Type'] != 'pbc':
633 raise Exception("Unexpected WPS Type: " + props
['WPS']['Type'])
635 class TestDbusWps(TestDbus
):
636 def __init__(self
, bus
, wps
):
637 TestDbus
.__init
__(self
, bus
)
638 self
.success_seen
= False
639 self
.credentials_received
= False
643 gobject
.timeout_add(1, self
.start_pbc
)
644 gobject
.timeout_add(15000, self
.timeout
)
645 self
.add_signal(self
.wpsEvent
, WPAS_DBUS_IFACE_WPS
, "Event")
646 self
.add_signal(self
.credentials
, WPAS_DBUS_IFACE_WPS
,
651 def wpsEvent(self
, name
, args
):
652 logger
.debug("wpsEvent: %s args='%s'" % (name
, str(args
)))
653 if name
== "success":
654 self
.success_seen
= True
655 if self
.credentials_received
:
658 def credentials(self
, args
):
659 logger
.debug("credentials: " + str(args
))
660 self
.credentials_received
= True
661 if self
.success_seen
:
664 def start_pbc(self
, *args
):
665 logger
.debug("start_pbc")
666 self
.wps
.Start({'Role': 'enrollee', 'Type': 'pbc'})
670 return self
.success_seen
and self
.credentials_received
672 with
TestDbusWps(bus
, wps
) as t
:
674 raise Exception("Failure in D-Bus operations")
676 dev
[0].wait_connected(timeout
=10)
677 dev
[0].request("DISCONNECT")
679 dev
[0].flush_scan_cache()
681 def test_dbus_wps_pbc_overlap(dev
, apdev
):
682 """D-Bus WPS/PBC operation and signal for PBC overlap"""
683 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
684 wps
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_WPS
)
686 hapd
= start_ap(apdev
[0])
687 hapd2
= start_ap(apdev
[1], ssid
="test-wps2",
688 ap_uuid
="27ea801a-9e5c-4e73-bd82-f89cbcd10d7f")
689 hapd
.request("WPS_PBC")
690 hapd2
.request("WPS_PBC")
691 bssid
= apdev
[0]['bssid']
692 dev
[0].scan_for_bss(bssid
, freq
="2412")
693 bssid2
= apdev
[1]['bssid']
694 dev
[0].scan_for_bss(bssid2
, freq
="2412")
696 class TestDbusWps(TestDbus
):
697 def __init__(self
, bus
, wps
):
698 TestDbus
.__init
__(self
, bus
)
699 self
.overlap_seen
= False
703 gobject
.timeout_add(1, self
.start_pbc
)
704 gobject
.timeout_add(15000, self
.timeout
)
705 self
.add_signal(self
.wpsEvent
, WPAS_DBUS_IFACE_WPS
, "Event")
709 def wpsEvent(self
, name
, args
):
710 logger
.debug("wpsEvent: %s args='%s'" % (name
, str(args
)))
711 if name
== "pbc-overlap":
712 self
.overlap_seen
= True
715 def start_pbc(self
, *args
):
716 logger
.debug("start_pbc")
717 self
.wps
.Start({'Role': 'enrollee', 'Type': 'pbc'})
721 return self
.overlap_seen
723 with
TestDbusWps(bus
, wps
) as t
:
725 raise Exception("Failure in D-Bus operations")
727 dev
[0].request("WPS_CANCEL")
728 dev
[0].request("DISCONNECT")
730 dev
[0].flush_scan_cache()
732 def test_dbus_wps_pin(dev
, apdev
):
733 """D-Bus WPS/PIN operation and signals"""
735 _test_dbus_wps_pin(dev
, apdev
)
737 dev
[0].request("SET wps_cred_processing 0")
739 def _test_dbus_wps_pin(dev
, apdev
):
740 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
741 wps
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_WPS
)
743 hapd
= start_ap(apdev
[0])
744 hapd
.request("WPS_PIN any 12345670")
745 bssid
= apdev
[0]['bssid']
746 dev
[0].scan_for_bss(bssid
, freq
="2412")
747 dev
[0].request("SET wps_cred_processing 2")
749 class TestDbusWps(TestDbus
):
750 def __init__(self
, bus
):
751 TestDbus
.__init
__(self
, bus
)
752 self
.success_seen
= False
753 self
.credentials_received
= False
756 gobject
.timeout_add(1, self
.start_pin
)
757 gobject
.timeout_add(15000, self
.timeout
)
758 self
.add_signal(self
.wpsEvent
, WPAS_DBUS_IFACE_WPS
, "Event")
759 self
.add_signal(self
.credentials
, WPAS_DBUS_IFACE_WPS
,
764 def wpsEvent(self
, name
, args
):
765 logger
.debug("wpsEvent: %s args='%s'" % (name
, str(args
)))
766 if name
== "success":
767 self
.success_seen
= True
768 if self
.credentials_received
:
771 def credentials(self
, args
):
772 logger
.debug("credentials: " + str(args
))
773 self
.credentials_received
= True
774 if self
.success_seen
:
777 def start_pin(self
, *args
):
778 logger
.debug("start_pin")
779 bssid_ay
= dbus
.ByteArray(bssid
.replace(':','').decode('hex'))
780 wps
.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
785 return self
.success_seen
and self
.credentials_received
787 with
TestDbusWps(bus
) as t
:
789 raise Exception("Failure in D-Bus operations")
791 dev
[0].wait_connected(timeout
=10)
793 def test_dbus_wps_pin2(dev
, apdev
):
794 """D-Bus WPS/PIN operation and signals (PIN from wpa_supplicant)"""
796 _test_dbus_wps_pin2(dev
, apdev
)
798 dev
[0].request("SET wps_cred_processing 0")
800 def _test_dbus_wps_pin2(dev
, apdev
):
801 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
802 wps
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_WPS
)
804 hapd
= start_ap(apdev
[0])
805 bssid
= apdev
[0]['bssid']
806 dev
[0].scan_for_bss(bssid
, freq
="2412")
807 dev
[0].request("SET wps_cred_processing 2")
809 class TestDbusWps(TestDbus
):
810 def __init__(self
, bus
):
811 TestDbus
.__init
__(self
, bus
)
812 self
.success_seen
= False
816 gobject
.timeout_add(1, self
.start_pin
)
817 gobject
.timeout_add(15000, self
.timeout
)
818 self
.add_signal(self
.wpsEvent
, WPAS_DBUS_IFACE_WPS
, "Event")
819 self
.add_signal(self
.credentials
, WPAS_DBUS_IFACE_WPS
,
824 def wpsEvent(self
, name
, args
):
825 logger
.debug("wpsEvent: %s args='%s'" % (name
, str(args
)))
826 if name
== "success":
827 self
.success_seen
= True
828 if self
.credentials_received
:
831 def credentials(self
, args
):
832 logger
.debug("credentials: " + str(args
))
833 self
.credentials_received
= True
834 if self
.success_seen
:
837 def start_pin(self
, *args
):
838 logger
.debug("start_pin")
839 bssid_ay
= dbus
.ByteArray(bssid
.replace(':','').decode('hex'))
840 res
= wps
.Start({'Role': 'enrollee', 'Type': 'pin',
843 h
= hostapd
.Hostapd(apdev
[0]['ifname'])
844 h
.request("WPS_PIN any " + pin
)
848 return self
.success_seen
and self
.credentials_received
850 with
TestDbusWps(bus
) as t
:
852 raise Exception("Failure in D-Bus operations")
854 dev
[0].wait_connected(timeout
=10)
856 def test_dbus_wps_pin_m2d(dev
, apdev
):
857 """D-Bus WPS/PIN operation and signals with M2D"""
859 _test_dbus_wps_pin_m2d(dev
, apdev
)
861 dev
[0].request("SET wps_cred_processing 0")
863 def _test_dbus_wps_pin_m2d(dev
, apdev
):
864 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
865 wps
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_WPS
)
867 hapd
= start_ap(apdev
[0])
868 bssid
= apdev
[0]['bssid']
869 dev
[0].scan_for_bss(bssid
, freq
="2412")
870 dev
[0].request("SET wps_cred_processing 2")
872 class TestDbusWps(TestDbus
):
873 def __init__(self
, bus
):
874 TestDbus
.__init
__(self
, bus
)
875 self
.success_seen
= False
876 self
.credentials_received
= False
879 gobject
.timeout_add(1, self
.start_pin
)
880 gobject
.timeout_add(15000, self
.timeout
)
881 self
.add_signal(self
.wpsEvent
, WPAS_DBUS_IFACE_WPS
, "Event")
882 self
.add_signal(self
.credentials
, WPAS_DBUS_IFACE_WPS
,
887 def wpsEvent(self
, name
, args
):
888 logger
.debug("wpsEvent: %s args='%s'" % (name
, str(args
)))
889 if name
== "success":
890 self
.success_seen
= True
891 if self
.credentials_received
:
894 h
= hostapd
.Hostapd(apdev
[0]['ifname'])
895 h
.request("WPS_PIN any 12345670")
897 def credentials(self
, args
):
898 logger
.debug("credentials: " + str(args
))
899 self
.credentials_received
= True
900 if self
.success_seen
:
903 def start_pin(self
, *args
):
904 logger
.debug("start_pin")
905 bssid_ay
= dbus
.ByteArray(bssid
.replace(':','').decode('hex'))
906 wps
.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
911 return self
.success_seen
and self
.credentials_received
913 with
TestDbusWps(bus
) as t
:
915 raise Exception("Failure in D-Bus operations")
917 dev
[0].wait_connected(timeout
=10)
919 def test_dbus_wps_reg(dev
, apdev
):
920 """D-Bus WPS/Registrar operation and signals"""
922 _test_dbus_wps_reg(dev
, apdev
)
924 dev
[0].request("SET wps_cred_processing 0")
926 def _test_dbus_wps_reg(dev
, apdev
):
927 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
928 wps
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_WPS
)
930 hapd
= start_ap(apdev
[0])
931 hapd
.request("WPS_PIN any 12345670")
932 bssid
= apdev
[0]['bssid']
933 dev
[0].scan_for_bss(bssid
, freq
="2412")
934 dev
[0].request("SET wps_cred_processing 2")
936 class TestDbusWps(TestDbus
):
937 def __init__(self
, bus
):
938 TestDbus
.__init
__(self
, bus
)
939 self
.credentials_received
= False
942 gobject
.timeout_add(100, self
.start_reg
)
943 gobject
.timeout_add(15000, self
.timeout
)
944 self
.add_signal(self
.wpsEvent
, WPAS_DBUS_IFACE_WPS
, "Event")
945 self
.add_signal(self
.credentials
, WPAS_DBUS_IFACE_WPS
,
950 def wpsEvent(self
, name
, args
):
951 logger
.debug("wpsEvent: %s args='%s'" % (name
, str(args
)))
953 def credentials(self
, args
):
954 logger
.debug("credentials: " + str(args
))
955 self
.credentials_received
= True
958 def start_reg(self
, *args
):
959 logger
.debug("start_reg")
960 bssid_ay
= dbus
.ByteArray(bssid
.replace(':','').decode('hex'))
961 wps
.Start({'Role': 'registrar', 'Type': 'pin',
962 'Pin': '12345670', 'Bssid': bssid_ay
})
966 return self
.credentials_received
968 with
TestDbusWps(bus
) as t
:
970 raise Exception("Failure in D-Bus operations")
972 dev
[0].wait_connected(timeout
=10)
974 def test_dbus_wps_cancel(dev
, apdev
):
975 """D-Bus WPS Cancel operation"""
976 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
977 wps
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_WPS
)
979 hapd
= start_ap(apdev
[0])
980 bssid
= apdev
[0]['bssid']
983 dev
[0].scan_for_bss(bssid
, freq
="2412")
984 bssid_ay
= dbus
.ByteArray(bssid
.replace(':','').decode('hex'))
985 wps
.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
988 dev
[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 1)
990 def test_dbus_scan_invalid(dev
, apdev
):
991 """D-Bus invalid scan method"""
992 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
993 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
995 tests
= [ ({}, "InvalidArgs"),
996 ({'Type': 123}, "InvalidArgs"),
997 ({'Type': 'foo'}, "InvalidArgs"),
998 ({'Type': 'active', 'Foo': 'bar'}, "InvalidArgs"),
999 ({'Type': 'active', 'SSIDs': 'foo'}, "InvalidArgs"),
1000 ({'Type': 'active', 'SSIDs': ['foo']}, "InvalidArgs"),
1002 'SSIDs': [ dbus
.ByteArray("1"), dbus
.ByteArray("2"),
1003 dbus
.ByteArray("3"), dbus
.ByteArray("4"),
1004 dbus
.ByteArray("5"), dbus
.ByteArray("6"),
1005 dbus
.ByteArray("7"), dbus
.ByteArray("8"),
1006 dbus
.ByteArray("9"), dbus
.ByteArray("10"),
1007 dbus
.ByteArray("11"), dbus
.ByteArray("12"),
1008 dbus
.ByteArray("13"), dbus
.ByteArray("14"),
1009 dbus
.ByteArray("15"), dbus
.ByteArray("16"),
1010 dbus
.ByteArray("17") ]},
1013 'SSIDs': [ dbus
.ByteArray("1234567890abcdef1234567890abcdef1") ]},
1015 ({'Type': 'active', 'IEs': 'foo'}, "InvalidArgs"),
1016 ({'Type': 'active', 'IEs': ['foo']}, "InvalidArgs"),
1017 ({'Type': 'active', 'Channels': 2412 }, "InvalidArgs"),
1018 ({'Type': 'active', 'Channels': [ 2412 ] }, "InvalidArgs"),
1020 'Channels': [ (dbus
.Int32(2412), dbus
.UInt32(20)) ] },
1023 'Channels': [ (dbus
.UInt32(2412), dbus
.Int32(20)) ] },
1025 ({'Type': 'active', 'AllowRoam': "yes" }, "InvalidArgs"),
1026 ({'Type': 'passive', 'IEs': [ dbus
.ByteArray("\xdd\x00") ]},
1028 ({'Type': 'passive', 'SSIDs': [ dbus
.ByteArray("foo") ]},
1030 for (t
,err
) in tests
:
1033 raise Exception("Invalid Scan() arguments accepted: " + str(t
))
1034 except dbus
.exceptions
.DBusException
, e
:
1035 if err
not in str(e
):
1036 raise Exception("Unexpected error message for invalid Scan(%s): %s" % (str(t
), str(e
)))
1038 def test_dbus_scan_oom(dev
, apdev
):
1039 """D-Bus scan method and OOM"""
1040 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
1041 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
1043 with
alloc_fail_dbus(dev
[0], 1,
1044 "wpa_scan_clone_params;wpas_dbus_handler_scan",
1045 "Scan", expected
="ScanError: Scan request rejected"):
1046 iface
.Scan({ 'Type': 'passive',
1047 'Channels': [ (dbus
.UInt32(2412), dbus
.UInt32(20)) ] })
1049 with
alloc_fail_dbus(dev
[0], 1,
1050 "=wpas_dbus_get_scan_channels;wpas_dbus_handler_scan",
1052 iface
.Scan({ 'Type': 'passive',
1053 'Channels': [ (dbus
.UInt32(2412), dbus
.UInt32(20)) ] })
1055 with
alloc_fail_dbus(dev
[0], 1,
1056 "=wpas_dbus_get_scan_ies;wpas_dbus_handler_scan",
1058 iface
.Scan({ 'Type': 'active',
1059 'IEs': [ dbus
.ByteArray("\xdd\x00") ],
1060 'Channels': [ (dbus
.UInt32(2412), dbus
.UInt32(20)) ] })
1062 with
alloc_fail_dbus(dev
[0], 1,
1063 "=wpas_dbus_get_scan_ssids;wpas_dbus_handler_scan",
1065 iface
.Scan({ 'Type': 'active',
1066 'SSIDs': [ dbus
.ByteArray("open"),
1068 'Channels': [ (dbus
.UInt32(2412), dbus
.UInt32(20)) ] })
1070 def test_dbus_scan(dev
, apdev
):
1071 """D-Bus scan and related signals"""
1072 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
1073 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
1075 hapd
= hostapd
.add_ap(apdev
[0], { "ssid": "open" })
1077 class TestDbusScan(TestDbus
):
1078 def __init__(self
, bus
):
1079 TestDbus
.__init
__(self
, bus
)
1080 self
.scan_completed
= 0
1081 self
.bss_added
= False
1082 self
.fail_reason
= None
1084 def __enter__(self
):
1085 gobject
.timeout_add(1, self
.run_scan
)
1086 gobject
.timeout_add(15000, self
.timeout
)
1087 self
.add_signal(self
.scanDone
, WPAS_DBUS_IFACE
, "ScanDone")
1088 self
.add_signal(self
.bssAdded
, WPAS_DBUS_IFACE
, "BSSAdded")
1089 self
.add_signal(self
.bssRemoved
, WPAS_DBUS_IFACE
, "BSSRemoved")
1093 def scanDone(self
, success
):
1094 logger
.debug("scanDone: success=%s" % success
)
1095 self
.scan_completed
+= 1
1096 if self
.scan_completed
== 1:
1097 iface
.Scan({'Type': 'passive',
1099 'Channels': [(dbus
.UInt32(2412), dbus
.UInt32(20))]})
1100 elif self
.scan_completed
== 2:
1101 iface
.Scan({'Type': 'passive',
1102 'AllowRoam': False})
1103 elif self
.bss_added
and self
.scan_completed
== 3:
1106 def bssAdded(self
, bss
, properties
):
1107 logger
.debug("bssAdded: %s" % bss
)
1108 logger
.debug(str(properties
))
1109 if 'WPS' in properties
:
1110 if 'Type' in properties
['WPS']:
1111 self
.fail_reason
= "Unexpected WPS dictionary entry in non-WPS BSS"
1113 self
.bss_added
= True
1114 if self
.scan_completed
== 3:
1117 def bssRemoved(self
, bss
):
1118 logger
.debug("bssRemoved: %s" % bss
)
1120 def run_scan(self
, *args
):
1121 logger
.debug("run_scan")
1122 iface
.Scan({'Type': 'active',
1123 'SSIDs': [ dbus
.ByteArray("open"),
1125 'IEs': [ dbus
.ByteArray("\xdd\x00"),
1128 'Channels': [(dbus
.UInt32(2412), dbus
.UInt32(20))]})
1132 return self
.scan_completed
== 3 and self
.bss_added
1134 with
TestDbusScan(bus
) as t
:
1136 raise Exception(t
.fail_reason
)
1138 raise Exception("Expected signals not seen")
1140 res
= if_obj
.Get(WPAS_DBUS_IFACE
, "BSSs",
1141 dbus_interface
=dbus
.PROPERTIES_IFACE
)
1143 raise Exception("Scan result not in BSSs property")
1145 res
= if_obj
.Get(WPAS_DBUS_IFACE
, "BSSs",
1146 dbus_interface
=dbus
.PROPERTIES_IFACE
)
1148 raise Exception("FlushBSS() did not remove scan results from BSSs property")
1151 def test_dbus_scan_busy(dev
, apdev
):
1152 """D-Bus scan trigger rejection when busy with previous scan"""
1153 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
1154 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
1156 if "OK" not in dev
[0].request("SCAN freq=2412-2462"):
1157 raise Exception("Failed to start scan")
1158 ev
= dev
[0].wait_event(["CTRL-EVENT-SCAN-STARTED"], 15)
1160 raise Exception("Scan start timed out")
1163 iface
.Scan({'Type': 'active', 'AllowRoam': False})
1164 raise Exception("Scan() accepted when busy")
1165 except dbus
.exceptions
.DBusException
, e
:
1166 if "ScanError: Scan request reject" not in str(e
):
1167 raise Exception("Unexpected error message: " + str(e
))
1169 ev
= dev
[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
1171 raise Exception("Scan timed out")
1173 def test_dbus_scan_abort(dev
, apdev
):
1174 """D-Bus scan trigger and abort"""
1175 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
1176 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
1178 iface
.Scan({'Type': 'active', 'AllowRoam': False})
1179 ev
= dev
[0].wait_event(["CTRL-EVENT-SCAN-STARTED"], 15)
1181 raise Exception("Scan start timed out")
1184 ev
= dev
[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
1186 raise Exception("Scan abort result timed out")
1187 dev
[0].dump_monitor()
1188 iface
.Scan({'Type': 'active', 'AllowRoam': False})
1191 ev
= dev
[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
1193 raise Exception("Scan timed out")
1195 def test_dbus_connect(dev
, apdev
):
1196 """D-Bus AddNetwork and connect"""
1197 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
1198 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
1200 ssid
= "test-wpa2-psk"
1201 passphrase
= 'qwertyuiop'
1202 params
= hostapd
.wpa2_params(ssid
=ssid
, passphrase
=passphrase
)
1203 hapd
= hostapd
.add_ap(apdev
[0], params
)
1205 class TestDbusConnect(TestDbus
):
1206 def __init__(self
, bus
):
1207 TestDbus
.__init
__(self
, bus
)
1208 self
.network_added
= False
1209 self
.network_selected
= False
1210 self
.network_removed
= False
1213 def __enter__(self
):
1214 gobject
.timeout_add(1, self
.run_connect
)
1215 gobject
.timeout_add(15000, self
.timeout
)
1216 self
.add_signal(self
.networkAdded
, WPAS_DBUS_IFACE
, "NetworkAdded")
1217 self
.add_signal(self
.networkRemoved
, WPAS_DBUS_IFACE
,
1219 self
.add_signal(self
.networkSelected
, WPAS_DBUS_IFACE
,
1221 self
.add_signal(self
.propertiesChanged
, WPAS_DBUS_IFACE
,
1222 "PropertiesChanged")
1226 def networkAdded(self
, network
, properties
):
1227 logger
.debug("networkAdded: %s" % str(network
))
1228 logger
.debug(str(properties
))
1229 self
.network_added
= True
1231 def networkRemoved(self
, network
):
1232 logger
.debug("networkRemoved: %s" % str(network
))
1233 self
.network_removed
= True
1235 def networkSelected(self
, network
):
1236 logger
.debug("networkSelected: %s" % str(network
))
1237 self
.network_selected
= True
1239 def propertiesChanged(self
, properties
):
1240 logger
.debug("propertiesChanged: %s" % str(properties
))
1241 if 'State' in properties
and properties
['State'] == "completed":
1245 elif self
.state
== 2:
1248 elif self
.state
== 4:
1251 elif self
.state
== 5:
1254 elif self
.state
== 7:
1256 res
= iface
.SignalPoll()
1257 logger
.debug("SignalPoll: " + str(res
))
1258 if 'frequency' not in res
or res
['frequency'] != 2412:
1260 logger
.info("Unexpected SignalPoll result")
1261 iface
.RemoveNetwork(self
.netw
)
1262 if 'State' in properties
and properties
['State'] == "disconnected":
1265 iface
.SelectNetwork(self
.netw
)
1266 elif self
.state
== 3:
1269 elif self
.state
== 6:
1272 elif self
.state
== 8:
1276 def run_connect(self
, *args
):
1277 logger
.debug("run_connect")
1278 args
= dbus
.Dictionary({ 'ssid': ssid
,
1279 'key_mgmt': 'WPA-PSK',
1281 'scan_freq': 2412 },
1283 self
.netw
= iface
.AddNetwork(args
)
1284 iface
.SelectNetwork(self
.netw
)
1288 if not self
.network_added
or \
1289 not self
.network_removed
or \
1290 not self
.network_selected
:
1292 return self
.state
== 9
1294 with
TestDbusConnect(bus
) as t
:
1296 raise Exception("Expected signals not seen")
1298 def test_dbus_connect_psk_mem(dev
, apdev
):
1299 """D-Bus AddNetwork and connect with memory-only PSK"""
1300 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
1301 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
1303 ssid
= "test-wpa2-psk"
1304 passphrase
= 'qwertyuiop'
1305 params
= hostapd
.wpa2_params(ssid
=ssid
, passphrase
=passphrase
)
1306 hapd
= hostapd
.add_ap(apdev
[0], params
)
1308 class TestDbusConnect(TestDbus
):
1309 def __init__(self
, bus
):
1310 TestDbus
.__init
__(self
, bus
)
1311 self
.connected
= False
1313 def __enter__(self
):
1314 gobject
.timeout_add(1, self
.run_connect
)
1315 gobject
.timeout_add(15000, self
.timeout
)
1316 self
.add_signal(self
.propertiesChanged
, WPAS_DBUS_IFACE
,
1317 "PropertiesChanged")
1318 self
.add_signal(self
.networkRequest
, WPAS_DBUS_IFACE
,
1323 def propertiesChanged(self
, properties
):
1324 logger
.debug("propertiesChanged: %s" % str(properties
))
1325 if 'State' in properties
and properties
['State'] == "completed":
1326 self
.connected
= True
1329 def networkRequest(self
, path
, field
, txt
):
1330 logger
.debug("networkRequest: %s %s %s" % (path
, field
, txt
))
1331 if field
== "PSK_PASSPHRASE":
1332 iface
.NetworkReply(path
, field
, '"' + passphrase
+ '"')
1334 def run_connect(self
, *args
):
1335 logger
.debug("run_connect")
1336 args
= dbus
.Dictionary({ 'ssid': ssid
,
1337 'key_mgmt': 'WPA-PSK',
1339 'scan_freq': 2412 },
1341 self
.netw
= iface
.AddNetwork(args
)
1342 iface
.SelectNetwork(self
.netw
)
1346 return self
.connected
1348 with
TestDbusConnect(bus
) as t
:
1350 raise Exception("Expected signals not seen")
1352 def test_dbus_connect_oom(dev
, apdev
):
1353 """D-Bus AddNetwork and connect when out-of-memory"""
1354 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
1355 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
1357 if "OK" not in dev
[0].request("TEST_ALLOC_FAIL 0:"):
1358 raise HwsimSkip("TEST_ALLOC_FAIL not supported in the build")
1360 ssid
= "test-wpa2-psk"
1361 passphrase
= 'qwertyuiop'
1362 params
= hostapd
.wpa2_params(ssid
=ssid
, passphrase
=passphrase
)
1363 hapd
= hostapd
.add_ap(apdev
[0], params
)
1365 class TestDbusConnect(TestDbus
):
1366 def __init__(self
, bus
):
1367 TestDbus
.__init
__(self
, bus
)
1368 self
.network_added
= False
1369 self
.network_selected
= False
1370 self
.network_removed
= False
1373 def __enter__(self
):
1374 gobject
.timeout_add(1, self
.run_connect
)
1375 gobject
.timeout_add(1500, self
.timeout
)
1376 self
.add_signal(self
.networkAdded
, WPAS_DBUS_IFACE
, "NetworkAdded")
1377 self
.add_signal(self
.networkRemoved
, WPAS_DBUS_IFACE
,
1379 self
.add_signal(self
.networkSelected
, WPAS_DBUS_IFACE
,
1381 self
.add_signal(self
.propertiesChanged
, WPAS_DBUS_IFACE
,
1382 "PropertiesChanged")
1386 def networkAdded(self
, network
, properties
):
1387 logger
.debug("networkAdded: %s" % str(network
))
1388 logger
.debug(str(properties
))
1389 self
.network_added
= True
1391 def networkRemoved(self
, network
):
1392 logger
.debug("networkRemoved: %s" % str(network
))
1393 self
.network_removed
= True
1395 def networkSelected(self
, network
):
1396 logger
.debug("networkSelected: %s" % str(network
))
1397 self
.network_selected
= True
1399 def propertiesChanged(self
, properties
):
1400 logger
.debug("propertiesChanged: %s" % str(properties
))
1401 if 'State' in properties
and properties
['State'] == "completed":
1405 elif self
.state
== 2:
1408 elif self
.state
== 4:
1411 elif self
.state
== 5:
1413 res
= iface
.SignalPoll()
1414 logger
.debug("SignalPoll: " + str(res
))
1415 if 'frequency' not in res
or res
['frequency'] != 2412:
1417 logger
.info("Unexpected SignalPoll result")
1418 iface
.RemoveNetwork(self
.netw
)
1419 if 'State' in properties
and properties
['State'] == "disconnected":
1422 iface
.SelectNetwork(self
.netw
)
1423 elif self
.state
== 3:
1426 elif self
.state
== 6:
1430 def run_connect(self
, *args
):
1431 logger
.debug("run_connect")
1432 args
= dbus
.Dictionary({ 'ssid': ssid
,
1433 'key_mgmt': 'WPA-PSK',
1435 'scan_freq': 2412 },
1438 self
.netw
= iface
.AddNetwork(args
)
1439 except Exception, e
:
1440 logger
.info("Exception on AddNetwork: " + str(e
))
1444 iface
.SelectNetwork(self
.netw
)
1445 except Exception, e
:
1446 logger
.info("Exception on SelectNetwork: " + str(e
))
1452 if not self
.network_added
or \
1453 not self
.network_removed
or \
1454 not self
.network_selected
:
1456 return self
.state
== 7
1459 for i
in range(1, 1000):
1461 dev
[j
].dump_monitor()
1462 dev
[0].request("TEST_ALLOC_FAIL %d:main" % i
)
1464 with
TestDbusConnect(bus
) as t
:
1466 logger
.info("Iteration %d - Expected signals not seen" % i
)
1468 logger
.info("Iteration %d - success" % i
)
1470 state
= dev
[0].request('GET_ALLOC_FAIL')
1471 logger
.info("GET_ALLOC_FAIL: " + state
)
1472 dev
[0].dump_monitor()
1473 dev
[0].request("TEST_ALLOC_FAIL 0:")
1475 raise Exception("Connection succeeded during out-of-memory")
1476 if not state
.startswith('0:'):
1483 # Force regulatory update to re-fetch hw capabilities for the following
1486 dev
[0].dump_monitor()
1487 subprocess
.call(['iw', 'reg', 'set', 'US'])
1488 ev
= dev
[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout
=1)
1490 dev
[0].dump_monitor()
1491 subprocess
.call(['iw', 'reg', 'set', '00'])
1492 ev
= dev
[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout
=1)
1494 def test_dbus_while_not_connected(dev
, apdev
):
1495 """D-Bus invalid operations while not connected"""
1496 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
1497 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
1501 raise Exception("Disconnect() accepted when not connected")
1502 except dbus
.exceptions
.DBusException
, e
:
1503 if "NotConnected" not in str(e
):
1504 raise Exception("Unexpected error message for invalid Disconnect: " + str(e
))
1508 raise Exception("Reattach() accepted when not connected")
1509 except dbus
.exceptions
.DBusException
, e
:
1510 if "NotConnected" not in str(e
):
1511 raise Exception("Unexpected error message for invalid Reattach: " + str(e
))
1513 def test_dbus_connect_eap(dev
, apdev
):
1514 """D-Bus AddNetwork and connect to EAP network"""
1515 check_altsubject_match_support(dev
[0])
1516 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
1517 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
1519 ssid
= "ieee8021x-open"
1520 params
= hostapd
.radius_params()
1521 params
["ssid"] = ssid
1522 params
["ieee8021x"] = "1"
1523 hapd
= hostapd
.add_ap(apdev
[0], params
)
1525 class TestDbusConnect(TestDbus
):
1526 def __init__(self
, bus
):
1527 TestDbus
.__init
__(self
, bus
)
1528 self
.certification_received
= False
1529 self
.eap_status
= False
1532 def __enter__(self
):
1533 gobject
.timeout_add(1, self
.run_connect
)
1534 gobject
.timeout_add(15000, self
.timeout
)
1535 self
.add_signal(self
.propertiesChanged
, WPAS_DBUS_IFACE
,
1536 "PropertiesChanged")
1537 self
.add_signal(self
.certification
, WPAS_DBUS_IFACE
,
1538 "Certification", byte_arrays
=True)
1539 self
.add_signal(self
.networkRequest
, WPAS_DBUS_IFACE
,
1541 self
.add_signal(self
.eap
, WPAS_DBUS_IFACE
, "EAP")
1545 def propertiesChanged(self
, properties
):
1546 logger
.debug("propertiesChanged: %s" % str(properties
))
1547 if 'State' in properties
and properties
['State'] == "completed":
1551 logger
.info("Set dNSName constraint")
1552 net_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, self
.netw
)
1553 args
= dbus
.Dictionary({ 'altsubject_match':
1554 self
.server_dnsname
},
1556 net_obj
.Set(WPAS_DBUS_NETWORK
, "Properties", args
,
1557 dbus_interface
=dbus
.PROPERTIES_IFACE
)
1558 elif self
.state
== 2:
1561 logger
.info("Set non-matching dNSName constraint")
1562 net_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, self
.netw
)
1563 args
= dbus
.Dictionary({ 'altsubject_match':
1564 self
.server_dnsname
+ "FOO" },
1566 net_obj
.Set(WPAS_DBUS_NETWORK
, "Properties", args
,
1567 dbus_interface
=dbus
.PROPERTIES_IFACE
)
1568 if 'State' in properties
and properties
['State'] == "disconnected":
1572 iface
.SelectNetwork(self
.netw
)
1575 iface
.SelectNetwork(self
.netw
)
1577 def certification(self
, args
):
1578 logger
.debug("certification: %s" % str(args
))
1579 self
.certification_received
= True
1580 if args
['depth'] == 0:
1581 # The test server certificate is supposed to have dNSName
1582 if len(args
['altsubject']) < 1:
1583 raise Exception("Missing dNSName")
1584 dnsname
= args
['altsubject'][0]
1585 if not dnsname
.startswith("DNS:"):
1586 raise Exception("Expected dNSName not found: " + dnsname
)
1587 logger
.info("altsubject: " + dnsname
)
1588 self
.server_dnsname
= dnsname
1590 def eap(self
, status
, parameter
):
1591 logger
.debug("EAP: status=%s parameter=%s" % (status
, parameter
))
1592 if status
== 'completion' and parameter
== 'success':
1593 self
.eap_status
= True
1594 if self
.state
== 4 and status
== 'remote certificate verification' and parameter
== 'AltSubject mismatch':
1598 def networkRequest(self
, path
, field
, txt
):
1599 logger
.debug("networkRequest: %s %s %s" % (path
, field
, txt
))
1600 if field
== "PASSWORD":
1601 iface
.NetworkReply(path
, field
, "password")
1603 def run_connect(self
, *args
):
1604 logger
.debug("run_connect")
1605 args
= dbus
.Dictionary({ 'ssid': ssid
,
1606 'key_mgmt': 'IEEE8021X',
1609 'anonymous_identity': 'ttls',
1610 'identity': 'pap user',
1611 'ca_cert': 'auth_serv/ca.pem',
1612 'phase2': 'auth=PAP',
1613 'scan_freq': 2412 },
1615 self
.netw
= iface
.AddNetwork(args
)
1616 iface
.SelectNetwork(self
.netw
)
1620 if not self
.eap_status
or not self
.certification_received
:
1622 return self
.state
== 5
1624 with
TestDbusConnect(bus
) as t
:
1626 raise Exception("Expected signals not seen")
1628 def test_dbus_network(dev
, apdev
):
1629 """D-Bus AddNetwork/RemoveNetwork parameters and error cases"""
1630 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
1631 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
1633 args
= dbus
.Dictionary({ 'ssid': "foo",
1634 'key_mgmt': 'WPA-PSK',
1636 'identity': dbus
.ByteArray([ 1, 2 ]),
1637 'priority': dbus
.Int32(0),
1638 'scan_freq': dbus
.UInt32(2412) },
1640 netw
= iface
.AddNetwork(args
)
1641 id = int(dev
[0].list_networks()[0]['id'])
1642 val
= dev
[0].get_network(id, "scan_freq")
1644 raise Exception("Invalid scan_freq value: " + str(val
))
1645 iface
.RemoveNetwork(netw
)
1647 args
= dbus
.Dictionary({ 'ssid': "foo",
1649 'scan_freq': "2412 2432",
1650 'freq_list': "2412 2417 2432" },
1652 netw
= iface
.AddNetwork(args
)
1653 id = int(dev
[0].list_networks()[0]['id'])
1654 val
= dev
[0].get_network(id, "scan_freq")
1655 if val
!= "2412 2432":
1656 raise Exception("Invalid scan_freq value (2): " + str(val
))
1657 val
= dev
[0].get_network(id, "freq_list")
1658 if val
!= "2412 2417 2432":
1659 raise Exception("Invalid freq_list value: " + str(val
))
1660 iface
.RemoveNetwork(netw
)
1662 iface
.RemoveNetwork(netw
)
1663 raise Exception("Invalid RemoveNetwork() accepted")
1664 except dbus
.exceptions
.DBusException
, e
:
1665 if "NetworkUnknown" not in str(e
):
1666 raise Exception("Unexpected error message for invalid RemoveNetwork: " + str(e
))
1668 iface
.SelectNetwork(netw
)
1669 raise Exception("Invalid SelectNetwork() accepted")
1670 except dbus
.exceptions
.DBusException
, e
:
1671 if "NetworkUnknown" not in str(e
):
1672 raise Exception("Unexpected error message for invalid RemoveNetwork: " + str(e
))
1674 args
= dbus
.Dictionary({ 'ssid': "foo1", 'key_mgmt': 'NONE',
1675 'identity': "testuser", 'scan_freq': '2412' },
1677 netw1
= iface
.AddNetwork(args
)
1678 args
= dbus
.Dictionary({ 'ssid': "foo2", 'key_mgmt': 'NONE' },
1680 netw2
= iface
.AddNetwork(args
)
1681 res
= if_obj
.Get(WPAS_DBUS_IFACE
, "Networks",
1682 dbus_interface
=dbus
.PROPERTIES_IFACE
)
1684 raise Exception("Unexpected number of networks")
1686 net_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, netw1
)
1687 res
= net_obj
.Get(WPAS_DBUS_NETWORK
, "Enabled",
1688 dbus_interface
=dbus
.PROPERTIES_IFACE
)
1690 raise Exception("Added network was unexpectedly enabled by default")
1691 net_obj
.Set(WPAS_DBUS_NETWORK
, "Enabled", dbus
.Boolean(True),
1692 dbus_interface
=dbus
.PROPERTIES_IFACE
)
1693 res
= net_obj
.Get(WPAS_DBUS_NETWORK
, "Enabled",
1694 dbus_interface
=dbus
.PROPERTIES_IFACE
)
1696 raise Exception("Set(Enabled,True) did not seem to change property value")
1697 net_obj
.Set(WPAS_DBUS_NETWORK
, "Enabled", dbus
.Boolean(False),
1698 dbus_interface
=dbus
.PROPERTIES_IFACE
)
1699 res
= net_obj
.Get(WPAS_DBUS_NETWORK
, "Enabled",
1700 dbus_interface
=dbus
.PROPERTIES_IFACE
)
1702 raise Exception("Set(Enabled,False) did not seem to change property value")
1704 net_obj
.Set(WPAS_DBUS_NETWORK
, "Enabled", dbus
.UInt32(1),
1705 dbus_interface
=dbus
.PROPERTIES_IFACE
)
1706 raise Exception("Invalid Set(Enabled,1) accepted")
1707 except dbus
.exceptions
.DBusException
, e
:
1708 if "Error.Failed: wrong property type" not in str(e
):
1709 raise Exception("Unexpected error message for invalid Set(Enabled,1): " + str(e
))
1711 args
= dbus
.Dictionary({ 'ssid': "foo1new" }, signature
='sv')
1712 net_obj
.Set(WPAS_DBUS_NETWORK
, "Properties", args
,
1713 dbus_interface
=dbus
.PROPERTIES_IFACE
)
1714 res
= net_obj
.Get(WPAS_DBUS_NETWORK
, "Properties",
1715 dbus_interface
=dbus
.PROPERTIES_IFACE
)
1716 if res
['ssid'] != '"foo1new"':
1717 raise Exception("Set(Properties) failed to update ssid")
1718 if res
['identity'] != '"testuser"':
1719 raise Exception("Set(Properties) unexpectedly changed unrelated parameter")
1721 iface
.RemoveAllNetworks()
1722 res
= if_obj
.Get(WPAS_DBUS_IFACE
, "Networks",
1723 dbus_interface
=dbus
.PROPERTIES_IFACE
)
1725 raise Exception("Unexpected number of networks")
1726 iface
.RemoveAllNetworks()
1728 tests
= [ dbus
.Dictionary({ 'psk': "1234567" }, signature
='sv'),
1729 dbus
.Dictionary({ 'identity': dbus
.ByteArray() },
1731 dbus
.Dictionary({ 'identity': dbus
.Byte(1) }, signature
='sv'),
1732 dbus
.Dictionary({ 'identity': "" }, signature
='sv') ]
1735 iface
.AddNetwork(args
)
1736 raise Exception("Invalid AddNetwork args accepted: " + str(args
))
1737 except dbus
.exceptions
.DBusException
, e
:
1738 if "InvalidArgs" not in str(e
):
1739 raise Exception("Unexpected error message for invalid AddNetwork: " + str(e
))
1741 def test_dbus_network_oom(dev
, apdev
):
1742 """D-Bus AddNetwork/RemoveNetwork parameters and OOM error cases"""
1743 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
1744 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
1746 args
= dbus
.Dictionary({ 'ssid': "foo1", 'key_mgmt': 'NONE',
1747 'identity': "testuser", 'scan_freq': '2412' },
1749 netw1
= iface
.AddNetwork(args
)
1750 net_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, netw1
)
1752 with
alloc_fail_dbus(dev
[0], 1,
1753 "wpa_config_get_all;wpas_dbus_getter_network_properties",
1755 net_obj
.Get(WPAS_DBUS_NETWORK
, "Properties",
1756 dbus_interface
=dbus
.PROPERTIES_IFACE
)
1758 iface
.RemoveAllNetworks()
1760 with
alloc_fail_dbus(dev
[0], 1,
1761 "wpas_dbus_new_decompose_object_path;wpas_dbus_handler_remove_network",
1762 "RemoveNetwork", "InvalidArgs"):
1763 iface
.RemoveNetwork(dbus
.ObjectPath("/fi/w1/wpa_supplicant1/Interfaces/1234/Networks/1234"))
1765 with
alloc_fail(dev
[0], 1, "wpa_dbus_register_object_per_iface;wpas_dbus_register_network"):
1766 args
= dbus
.Dictionary({ 'ssid': "foo2", 'key_mgmt': 'NONE' },
1769 netw
= iface
.AddNetwork(args
)
1770 # Currently, AddNetwork() succeeds even if os_strdup() for path
1771 # fails, so remove the network if that occurs.
1772 iface
.RemoveNetwork(netw
)
1773 except dbus
.exceptions
.DBusException
, e
:
1776 for i
in range(1, 3):
1777 with
alloc_fail(dev
[0], i
, "=wpas_dbus_register_network"):
1779 netw
= iface
.AddNetwork(args
)
1780 # Currently, AddNetwork() succeeds even if network registration
1781 # fails, so remove the network if that occurs.
1782 iface
.RemoveNetwork(netw
)
1783 except dbus
.exceptions
.DBusException
, e
:
1786 with
alloc_fail_dbus(dev
[0], 1,
1787 "=wpa_config_add_network;wpas_dbus_handler_add_network",
1789 "UnknownError: wpa_supplicant could not add a network"):
1790 args
= dbus
.Dictionary({ 'ssid': "foo2", 'key_mgmt': 'NONE' },
1792 netw
= iface
.AddNetwork(args
)
1795 'wpa_dbus_dict_get_entry;set_network_properties;wpas_dbus_handler_add_network',
1796 dbus
.Dictionary({ 'ssid': dbus
.ByteArray(' ') },
1798 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1799 dbus
.Dictionary({ 'ssid': 'foo' }, signature
='sv')),
1800 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1801 dbus
.Dictionary({ 'eap': 'foo' }, signature
='sv')),
1802 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1803 dbus
.Dictionary({ 'priority': dbus
.UInt32(1) },
1805 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1806 dbus
.Dictionary({ 'priority': dbus
.Int32(1) },
1808 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1809 dbus
.Dictionary({ 'ssid': dbus
.ByteArray(' ') },
1811 for (count
,funcs
,args
) in tests
:
1812 with
alloc_fail_dbus(dev
[0], count
, funcs
, "AddNetwork", "InvalidArgs"):
1813 netw
= iface
.AddNetwork(args
)
1815 if len(if_obj
.Get(WPAS_DBUS_IFACE
, 'Networks',
1816 dbus_interface
=dbus
.PROPERTIES_IFACE
)) > 0:
1817 raise Exception("Unexpected network block added")
1818 if len(dev
[0].list_networks()) > 0:
1819 raise Exception("Unexpected network block visible")
1821 def test_dbus_interface(dev
, apdev
):
1822 """D-Bus CreateInterface/GetInterface/RemoveInterface parameters and error cases"""
1824 _test_dbus_interface(dev
, apdev
)
1826 # Need to force P2P channel list update since the 'lo' interface
1827 # with driver=none ends up configuring default dualband channels.
1828 dev
[0].request("SET country US")
1829 ev
= dev
[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout
=1)
1831 ev
= dev
[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"],
1833 dev
[0].request("SET country 00")
1834 ev
= dev
[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout
=1)
1836 ev
= dev
[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"],
1838 subprocess
.call(['iw', 'reg', 'set', '00'])
1840 def _test_dbus_interface(dev
, apdev
):
1841 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
1842 wpas
= dbus
.Interface(wpas_obj
, WPAS_DBUS_SERVICE
)
1844 params
= dbus
.Dictionary({ 'Ifname': 'lo', 'Driver': 'none' },
1846 path
= wpas
.CreateInterface(params
)
1847 logger
.debug("New interface path: " + str(path
))
1848 path2
= wpas
.GetInterface("lo")
1850 raise Exception("Interface object mismatch")
1852 params
= dbus
.Dictionary({ 'Ifname': 'lo',
1854 'ConfigFile': 'foo',
1855 'BridgeIfname': 'foo', },
1858 wpas
.CreateInterface(params
)
1859 raise Exception("Invalid CreateInterface() accepted")
1860 except dbus
.exceptions
.DBusException
, e
:
1861 if "InterfaceExists" not in str(e
):
1862 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e
))
1864 wpas
.RemoveInterface(path
)
1866 wpas
.RemoveInterface(path
)
1867 raise Exception("Invalid RemoveInterface() accepted")
1868 except dbus
.exceptions
.DBusException
, e
:
1869 if "InterfaceUnknown" not in str(e
):
1870 raise Exception("Unexpected error message for invalid RemoveInterface: " + str(e
))
1872 params
= dbus
.Dictionary({ 'Ifname': 'lo', 'Driver': 'none',
1876 wpas
.CreateInterface(params
)
1877 raise Exception("Invalid CreateInterface() accepted")
1878 except dbus
.exceptions
.DBusException
, e
:
1879 if "InvalidArgs" not in str(e
):
1880 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e
))
1882 params
= dbus
.Dictionary({ 'Driver': 'none' }, signature
='sv')
1884 wpas
.CreateInterface(params
)
1885 raise Exception("Invalid CreateInterface() accepted")
1886 except dbus
.exceptions
.DBusException
, e
:
1887 if "InvalidArgs" not in str(e
):
1888 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e
))
1891 wpas
.GetInterface("lo")
1892 raise Exception("Invalid GetInterface() accepted")
1893 except dbus
.exceptions
.DBusException
, e
:
1894 if "InterfaceUnknown" not in str(e
):
1895 raise Exception("Unexpected error message for invalid RemoveInterface: " + str(e
))
1897 def test_dbus_interface_oom(dev
, apdev
):
1898 """D-Bus CreateInterface/GetInterface/RemoveInterface OOM error cases"""
1899 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
1900 wpas
= dbus
.Interface(wpas_obj
, WPAS_DBUS_SERVICE
)
1902 with
alloc_fail_dbus(dev
[0], 1, "wpa_dbus_dict_get_entry;wpas_dbus_handler_create_interface", "CreateInterface", "InvalidArgs"):
1903 params
= dbus
.Dictionary({ 'Ifname': 'lo', 'Driver': 'none' },
1905 wpas
.CreateInterface(params
)
1907 for i
in range(1, 1000):
1908 dev
[0].request("TEST_ALLOC_FAIL %d:wpa_supplicant_add_iface;wpas_dbus_handler_create_interface" % i
)
1909 params
= dbus
.Dictionary({ 'Ifname': 'lo', 'Driver': 'none' },
1912 npath
= wpas
.CreateInterface(params
)
1913 wpas
.RemoveInterface(npath
)
1914 logger
.info("CreateInterface succeeds after %d allocation failures" % i
)
1915 state
= dev
[0].request('GET_ALLOC_FAIL')
1916 logger
.info("GET_ALLOC_FAIL: " + state
)
1917 dev
[0].dump_monitor()
1918 dev
[0].request("TEST_ALLOC_FAIL 0:")
1920 raise Exception("CreateInterface succeeded during out-of-memory")
1921 if not state
.startswith('0:'):
1923 except dbus
.exceptions
.DBusException
, e
:
1926 for arg
in [ 'Driver', 'Ifname', 'ConfigFile', 'BridgeIfname' ]:
1927 with
alloc_fail_dbus(dev
[0], 1, "=wpas_dbus_handler_create_interface",
1929 params
= dbus
.Dictionary({ arg
: 'foo' }, signature
='sv')
1930 wpas
.CreateInterface(params
)
1932 def test_dbus_blob(dev
, apdev
):
1933 """D-Bus AddNetwork/RemoveNetwork parameters and error cases"""
1934 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
1935 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
1937 blob
= dbus
.ByteArray("\x01\x02\x03")
1938 iface
.AddBlob('blob1', blob
)
1940 iface
.AddBlob('blob1', dbus
.ByteArray("\x01\x02\x04"))
1941 raise Exception("Invalid AddBlob() accepted")
1942 except dbus
.exceptions
.DBusException
, e
:
1943 if "BlobExists" not in str(e
):
1944 raise Exception("Unexpected error message for invalid AddBlob: " + str(e
))
1945 res
= iface
.GetBlob('blob1')
1946 if len(res
) != len(blob
):
1947 raise Exception("Unexpected blob data length")
1948 for i
in range(len(res
)):
1949 if res
[i
] != dbus
.Byte(blob
[i
]):
1950 raise Exception("Unexpected blob data")
1951 res
= if_obj
.Get(WPAS_DBUS_IFACE
, "Blobs",
1952 dbus_interface
=dbus
.PROPERTIES_IFACE
)
1953 if 'blob1' not in res
:
1954 raise Exception("Added blob missing from Blobs property")
1955 iface
.RemoveBlob('blob1')
1957 iface
.RemoveBlob('blob1')
1958 raise Exception("Invalid RemoveBlob() accepted")
1959 except dbus
.exceptions
.DBusException
, e
:
1960 if "BlobUnknown" not in str(e
):
1961 raise Exception("Unexpected error message for invalid RemoveBlob: " + str(e
))
1963 iface
.GetBlob('blob1')
1964 raise Exception("Invalid GetBlob() accepted")
1965 except dbus
.exceptions
.DBusException
, e
:
1966 if "BlobUnknown" not in str(e
):
1967 raise Exception("Unexpected error message for invalid GetBlob: " + str(e
))
1969 class TestDbusBlob(TestDbus
):
1970 def __init__(self
, bus
):
1971 TestDbus
.__init
__(self
, bus
)
1972 self
.blob_added
= False
1973 self
.blob_removed
= False
1975 def __enter__(self
):
1976 gobject
.timeout_add(1, self
.run_blob
)
1977 gobject
.timeout_add(15000, self
.timeout
)
1978 self
.add_signal(self
.blobAdded
, WPAS_DBUS_IFACE
, "BlobAdded")
1979 self
.add_signal(self
.blobRemoved
, WPAS_DBUS_IFACE
, "BlobRemoved")
1983 def blobAdded(self
, blobName
):
1984 logger
.debug("blobAdded: %s" % blobName
)
1985 if blobName
== 'blob2':
1986 self
.blob_added
= True
1988 def blobRemoved(self
, blobName
):
1989 logger
.debug("blobRemoved: %s" % blobName
)
1990 if blobName
== 'blob2':
1991 self
.blob_removed
= True
1994 def run_blob(self
, *args
):
1995 logger
.debug("run_blob")
1996 iface
.AddBlob('blob2', dbus
.ByteArray("\x01\x02\x04"))
1997 iface
.RemoveBlob('blob2')
2001 return self
.blob_added
and self
.blob_removed
2003 with
TestDbusBlob(bus
) as t
:
2005 raise Exception("Expected signals not seen")
2007 def test_dbus_blob_oom(dev
, apdev
):
2008 """D-Bus AddNetwork/RemoveNetwork OOM error cases"""
2009 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
2010 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
2012 for i
in range(1, 4):
2013 with
alloc_fail_dbus(dev
[0], i
, "wpas_dbus_handler_add_blob",
2015 iface
.AddBlob('blob_no_mem', dbus
.ByteArray("\x01\x02\x03\x04"))
2017 def test_dbus_autoscan(dev
, apdev
):
2018 """D-Bus Autoscan()"""
2019 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
2020 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
2022 iface
.AutoScan("foo")
2023 iface
.AutoScan("periodic:1")
2025 dev
[0].request("AUTOSCAN ")
2027 def test_dbus_autoscan_oom(dev
, apdev
):
2028 """D-Bus Autoscan() OOM"""
2029 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
2030 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
2032 with
alloc_fail_dbus(dev
[0], 1, "wpas_dbus_handler_autoscan", "AutoScan"):
2033 iface
.AutoScan("foo")
2034 dev
[0].request("AUTOSCAN ")
2036 def test_dbus_tdls_invalid(dev
, apdev
):
2037 """D-Bus invalid TDLS operations"""
2038 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
2039 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
2041 hapd
= hostapd
.add_ap(apdev
[0], { "ssid": "test-open" })
2042 connect_2sta_open(dev
, hapd
)
2043 addr1
= dev
[1].p2p_interface_addr()
2046 iface
.TDLSDiscover("foo")
2047 raise Exception("Invalid TDLSDiscover() accepted")
2048 except dbus
.exceptions
.DBusException
, e
:
2049 if "InvalidArgs" not in str(e
):
2050 raise Exception("Unexpected error message for invalid TDLSDiscover: " + str(e
))
2053 iface
.TDLSStatus("foo")
2054 raise Exception("Invalid TDLSStatus() accepted")
2055 except dbus
.exceptions
.DBusException
, e
:
2056 if "InvalidArgs" not in str(e
):
2057 raise Exception("Unexpected error message for invalid TDLSStatus: " + str(e
))
2059 res
= iface
.TDLSStatus(addr1
)
2060 if res
!= "peer does not exist":
2061 raise Exception("Unexpected TDLSStatus response")
2064 iface
.TDLSSetup("foo")
2065 raise Exception("Invalid TDLSSetup() accepted")
2066 except dbus
.exceptions
.DBusException
, e
:
2067 if "InvalidArgs" not in str(e
):
2068 raise Exception("Unexpected error message for invalid TDLSSetup: " + str(e
))
2071 iface
.TDLSTeardown("foo")
2072 raise Exception("Invalid TDLSTeardown() accepted")
2073 except dbus
.exceptions
.DBusException
, e
:
2074 if "InvalidArgs" not in str(e
):
2075 raise Exception("Unexpected error message for invalid TDLSTeardown: " + str(e
))
2078 iface
.TDLSTeardown("00:11:22:33:44:55")
2079 raise Exception("TDLSTeardown accepted for unknown peer")
2080 except dbus
.exceptions
.DBusException
, e
:
2081 if "UnknownError: error performing TDLS teardown" not in str(e
):
2082 raise Exception("Unexpected error message: " + str(e
))
2085 iface
.TDLSChannelSwitch({})
2086 raise Exception("Invalid TDLSChannelSwitch() accepted")
2087 except dbus
.exceptions
.DBusException
, e
:
2088 if "InvalidArgs" not in str(e
):
2089 raise Exception("Unexpected error message for invalid TDLSChannelSwitch: " + str(e
))
2092 iface
.TDLSCancelChannelSwitch("foo")
2093 raise Exception("Invalid TDLSCancelChannelSwitch() accepted")
2094 except dbus
.exceptions
.DBusException
, e
:
2095 if "InvalidArgs" not in str(e
):
2096 raise Exception("Unexpected error message for invalid TDLSCancelChannelSwitch: " + str(e
))
2098 def test_dbus_tdls_oom(dev
, apdev
):
2099 """D-Bus TDLS operations during OOM"""
2100 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
2101 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
2103 with
alloc_fail_dbus(dev
[0], 1, "wpa_tdls_add_peer", "TDLSSetup",
2104 "UnknownError: error performing TDLS setup"):
2105 iface
.TDLSSetup("00:11:22:33:44:55")
2107 def test_dbus_tdls(dev
, apdev
):
2109 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
2110 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
2112 hapd
= hostapd
.add_ap(apdev
[0], { "ssid": "test-open" })
2113 connect_2sta_open(dev
, hapd
)
2115 addr1
= dev
[1].p2p_interface_addr()
2117 class TestDbusTdls(TestDbus
):
2118 def __init__(self
, bus
):
2119 TestDbus
.__init
__(self
, bus
)
2120 self
.tdls_setup
= False
2121 self
.tdls_teardown
= False
2123 def __enter__(self
):
2124 gobject
.timeout_add(1, self
.run_tdls
)
2125 gobject
.timeout_add(15000, self
.timeout
)
2126 self
.add_signal(self
.propertiesChanged
, WPAS_DBUS_IFACE
,
2127 "PropertiesChanged")
2131 def propertiesChanged(self
, properties
):
2132 logger
.debug("propertiesChanged: %s" % str(properties
))
2134 def run_tdls(self
, *args
):
2135 logger
.debug("run_tdls")
2136 iface
.TDLSDiscover(addr1
)
2137 gobject
.timeout_add(100, self
.run_tdls2
)
2140 def run_tdls2(self
, *args
):
2141 logger
.debug("run_tdls2")
2142 iface
.TDLSSetup(addr1
)
2143 gobject
.timeout_add(500, self
.run_tdls3
)
2146 def run_tdls3(self
, *args
):
2147 logger
.debug("run_tdls3")
2148 res
= iface
.TDLSStatus(addr1
)
2149 if res
== "connected":
2150 self
.tdls_setup
= True
2152 logger
.info("Unexpected TDLSStatus: " + res
)
2153 iface
.TDLSTeardown(addr1
)
2154 gobject
.timeout_add(200, self
.run_tdls4
)
2157 def run_tdls4(self
, *args
):
2158 logger
.debug("run_tdls4")
2159 res
= iface
.TDLSStatus(addr1
)
2160 if res
== "peer does not exist":
2161 self
.tdls_teardown
= True
2163 logger
.info("Unexpected TDLSStatus: " + res
)
2168 return self
.tdls_setup
and self
.tdls_teardown
2170 with
TestDbusTdls(bus
) as t
:
2172 raise Exception("Expected signals not seen")
2174 def test_dbus_tdls_channel_switch(dev
, apdev
):
2175 """D-Bus TDLS channel switch configuration"""
2176 flags
= int(dev
[0].get_driver_status_field('capa.flags'), 16)
2177 if flags
& 0x800000000 == 0:
2178 raise HwsimSkip("Driver does not support TDLS channel switching")
2180 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
2181 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
2183 hapd
= hostapd
.add_ap(apdev
[0], { "ssid": "test-open" })
2184 connect_2sta_open(dev
, hapd
)
2186 addr1
= dev
[1].p2p_interface_addr()
2188 class TestDbusTdls(TestDbus
):
2189 def __init__(self
, bus
):
2190 TestDbus
.__init
__(self
, bus
)
2191 self
.tdls_setup
= False
2192 self
.tdls_done
= False
2194 def __enter__(self
):
2195 gobject
.timeout_add(1, self
.run_tdls
)
2196 gobject
.timeout_add(15000, self
.timeout
)
2197 self
.add_signal(self
.propertiesChanged
, WPAS_DBUS_IFACE
,
2198 "PropertiesChanged")
2202 def propertiesChanged(self
, properties
):
2203 logger
.debug("propertiesChanged: %s" % str(properties
))
2205 def run_tdls(self
, *args
):
2206 logger
.debug("run_tdls")
2207 iface
.TDLSDiscover(addr1
)
2208 gobject
.timeout_add(100, self
.run_tdls2
)
2211 def run_tdls2(self
, *args
):
2212 logger
.debug("run_tdls2")
2213 iface
.TDLSSetup(addr1
)
2214 gobject
.timeout_add(500, self
.run_tdls3
)
2217 def run_tdls3(self
, *args
):
2218 logger
.debug("run_tdls3")
2219 res
= iface
.TDLSStatus(addr1
)
2220 if res
== "connected":
2221 self
.tdls_setup
= True
2223 logger
.info("Unexpected TDLSStatus: " + res
)
2225 # Unknown dict entry
2226 args
= dbus
.Dictionary({ 'Foobar': dbus
.Byte(1) },
2229 iface
.TDLSChannelSwitch(args
)
2230 except Exception, e
:
2231 if "InvalidArgs" not in str(e
):
2232 raise Exception("Unexpected exception")
2235 args
= dbus
.Dictionary({}, signature
='sv')
2237 iface
.TDLSChannelSwitch(args
)
2238 except Exception, e
:
2239 if "InvalidArgs" not in str(e
):
2240 raise Exception("Unexpected exception")
2243 args
= dbus
.Dictionary({ 'OperClass': dbus
.Byte(1) },
2246 iface
.TDLSChannelSwitch(args
)
2247 except Exception, e
:
2248 if "InvalidArgs" not in str(e
):
2249 raise Exception("Unexpected exception")
2251 # Missing PeerAddress
2252 args
= dbus
.Dictionary({ 'OperClass': dbus
.Byte(1),
2253 'Frequency': dbus
.UInt32(2417) },
2256 iface
.TDLSChannelSwitch(args
)
2257 except Exception, e
:
2258 if "InvalidArgs" not in str(e
):
2259 raise Exception("Unexpected exception")
2262 args
= dbus
.Dictionary({ 'OperClass': dbus
.Byte(1),
2263 'Frequency': dbus
.UInt32(2417),
2264 'PeerAddress': addr1
,
2265 'SecChannelOffset': dbus
.UInt32(0),
2266 'CenterFrequency1': dbus
.UInt32(0),
2267 'CenterFrequency2': dbus
.UInt32(0),
2268 'Bandwidth': dbus
.UInt32(20),
2269 'HT': dbus
.Boolean(False),
2270 'VHT': dbus
.Boolean(False) },
2272 iface
.TDLSChannelSwitch(args
)
2274 gobject
.timeout_add(200, self
.run_tdls4
)
2277 def run_tdls4(self
, *args
):
2278 logger
.debug("run_tdls4")
2279 iface
.TDLSCancelChannelSwitch(addr1
)
2280 self
.tdls_done
= True
2285 return self
.tdls_setup
and self
.tdls_done
2287 with
TestDbusTdls(bus
) as t
:
2289 raise Exception("Expected signals not seen")
2291 def test_dbus_pkcs11(dev
, apdev
):
2292 """D-Bus SetPKCS11EngineAndModulePath()"""
2293 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
2294 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
2297 iface
.SetPKCS11EngineAndModulePath("foo", "bar")
2298 except dbus
.exceptions
.DBusException
, e
:
2299 if "Error.Failed: Reinit of the EAPOL" not in str(e
):
2300 raise Exception("Unexpected error message for invalid SetPKCS11EngineAndModulePath: " + str(e
))
2303 iface
.SetPKCS11EngineAndModulePath("foo", "")
2304 except dbus
.exceptions
.DBusException
, e
:
2305 if "Error.Failed: Reinit of the EAPOL" not in str(e
):
2306 raise Exception("Unexpected error message for invalid SetPKCS11EngineAndModulePath: " + str(e
))
2308 iface
.SetPKCS11EngineAndModulePath("", "bar")
2309 res
= if_obj
.Get(WPAS_DBUS_IFACE
, "PKCS11EnginePath",
2310 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2312 raise Exception("Unexpected PKCS11EnginePath value: " + res
)
2313 res
= if_obj
.Get(WPAS_DBUS_IFACE
, "PKCS11ModulePath",
2314 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2316 raise Exception("Unexpected PKCS11ModulePath value: " + res
)
2318 iface
.SetPKCS11EngineAndModulePath("", "")
2319 res
= if_obj
.Get(WPAS_DBUS_IFACE
, "PKCS11EnginePath",
2320 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2322 raise Exception("Unexpected PKCS11EnginePath value: " + res
)
2323 res
= if_obj
.Get(WPAS_DBUS_IFACE
, "PKCS11ModulePath",
2324 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2326 raise Exception("Unexpected PKCS11ModulePath value: " + res
)
2328 def test_dbus_apscan(dev
, apdev
):
2329 """D-Bus Get/Set ApScan"""
2331 _test_dbus_apscan(dev
, apdev
)
2333 dev
[0].request("AP_SCAN 1")
2335 def _test_dbus_apscan(dev
, apdev
):
2336 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
2338 res
= if_obj
.Get(WPAS_DBUS_IFACE
, "ApScan",
2339 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2341 raise Exception("Unexpected initial ApScan value: %d" % res
)
2344 if_obj
.Set(WPAS_DBUS_IFACE
, "ApScan", dbus
.UInt32(i
),
2345 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2346 res
= if_obj
.Get(WPAS_DBUS_IFACE
, "ApScan",
2347 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2349 raise Exception("Unexpected ApScan value %d (expected %d)" % (res
, i
))
2352 if_obj
.Set(WPAS_DBUS_IFACE
, "ApScan", dbus
.Int16(-1),
2353 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2354 raise Exception("Invalid Set(ApScan,-1) accepted")
2355 except dbus
.exceptions
.DBusException
, e
:
2356 if "Error.Failed: wrong property type" not in str(e
):
2357 raise Exception("Unexpected error message for invalid Set(ApScan,-1): " + str(e
))
2360 if_obj
.Set(WPAS_DBUS_IFACE
, "ApScan", dbus
.UInt32(123),
2361 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2362 raise Exception("Invalid Set(ApScan,123) accepted")
2363 except dbus
.exceptions
.DBusException
, e
:
2364 if "Error.Failed: ap_scan must be 0, 1, or 2" not in str(e
):
2365 raise Exception("Unexpected error message for invalid Set(ApScan,123): " + str(e
))
2367 if_obj
.Set(WPAS_DBUS_IFACE
, "ApScan", dbus
.UInt32(1),
2368 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2370 def test_dbus_pmf(dev
, apdev
):
2371 """D-Bus Get/Set Pmf"""
2373 _test_dbus_pmf(dev
, apdev
)
2375 dev
[0].request("SET pmf 0")
2377 def _test_dbus_pmf(dev
, apdev
):
2378 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
2380 dev
[0].set("pmf", "0")
2381 res
= if_obj
.Get(WPAS_DBUS_IFACE
, "Pmf",
2382 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2384 raise Exception("Unexpected initial Pmf value: %s" % res
)
2387 if_obj
.Set(WPAS_DBUS_IFACE
, "Pmf", str(i
),
2388 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2389 res
= if_obj
.Get(WPAS_DBUS_IFACE
, "Pmf",
2390 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2392 raise Exception("Unexpected Pmf value %s (expected %d)" % (res
, i
))
2394 if_obj
.Set(WPAS_DBUS_IFACE
, "Pmf", "1",
2395 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2397 def test_dbus_fastreauth(dev
, apdev
):
2398 """D-Bus Get/Set FastReauth"""
2399 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
2401 res
= if_obj
.Get(WPAS_DBUS_IFACE
, "FastReauth",
2402 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2404 raise Exception("Unexpected initial FastReauth value: " + str(res
))
2406 for i
in [ False, True ]:
2407 if_obj
.Set(WPAS_DBUS_IFACE
, "FastReauth", dbus
.Boolean(i
),
2408 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2409 res
= if_obj
.Get(WPAS_DBUS_IFACE
, "FastReauth",
2410 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2412 raise Exception("Unexpected FastReauth value %d (expected %d)" % (res
, i
))
2415 if_obj
.Set(WPAS_DBUS_IFACE
, "FastReauth", dbus
.Int16(-1),
2416 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2417 raise Exception("Invalid Set(FastReauth,-1) accepted")
2418 except dbus
.exceptions
.DBusException
, e
:
2419 if "Error.Failed: wrong property type" not in str(e
):
2420 raise Exception("Unexpected error message for invalid Set(ApScan,-1): " + str(e
))
2422 if_obj
.Set(WPAS_DBUS_IFACE
, "FastReauth", dbus
.Boolean(True),
2423 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2425 def test_dbus_bss_expire(dev
, apdev
):
2426 """D-Bus Get/Set BSSExpireAge and BSSExpireCount"""
2427 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
2429 if_obj
.Set(WPAS_DBUS_IFACE
, "BSSExpireAge", dbus
.UInt32(179),
2430 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2431 res
= if_obj
.Get(WPAS_DBUS_IFACE
, "BSSExpireAge",
2432 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2434 raise Exception("Unexpected BSSExpireAge value %d (expected %d)" % (res
, i
))
2436 if_obj
.Set(WPAS_DBUS_IFACE
, "BSSExpireCount", dbus
.UInt32(3),
2437 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2438 res
= if_obj
.Get(WPAS_DBUS_IFACE
, "BSSExpireCount",
2439 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2441 raise Exception("Unexpected BSSExpireCount value %d (expected %d)" % (res
, i
))
2444 if_obj
.Set(WPAS_DBUS_IFACE
, "BSSExpireAge", dbus
.Int16(-1),
2445 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2446 raise Exception("Invalid Set(BSSExpireAge,-1) accepted")
2447 except dbus
.exceptions
.DBusException
, e
:
2448 if "Error.Failed: wrong property type" not in str(e
):
2449 raise Exception("Unexpected error message for invalid Set(BSSExpireAge,-1): " + str(e
))
2452 if_obj
.Set(WPAS_DBUS_IFACE
, "BSSExpireAge", dbus
.UInt32(9),
2453 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2454 raise Exception("Invalid Set(BSSExpireAge,9) accepted")
2455 except dbus
.exceptions
.DBusException
, e
:
2456 if "Error.Failed: BSSExpireAge must be >= 10" not in str(e
):
2457 raise Exception("Unexpected error message for invalid Set(BSSExpireAge,9): " + str(e
))
2460 if_obj
.Set(WPAS_DBUS_IFACE
, "BSSExpireCount", dbus
.Int16(-1),
2461 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2462 raise Exception("Invalid Set(BSSExpireCount,-1) accepted")
2463 except dbus
.exceptions
.DBusException
, e
:
2464 if "Error.Failed: wrong property type" not in str(e
):
2465 raise Exception("Unexpected error message for invalid Set(BSSExpireCount,-1): " + str(e
))
2468 if_obj
.Set(WPAS_DBUS_IFACE
, "BSSExpireCount", dbus
.UInt32(0),
2469 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2470 raise Exception("Invalid Set(BSSExpireCount,0) accepted")
2471 except dbus
.exceptions
.DBusException
, e
:
2472 if "Error.Failed: BSSExpireCount must be > 0" not in str(e
):
2473 raise Exception("Unexpected error message for invalid Set(BSSExpireCount,0): " + str(e
))
2475 if_obj
.Set(WPAS_DBUS_IFACE
, "BSSExpireAge", dbus
.UInt32(180),
2476 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2477 if_obj
.Set(WPAS_DBUS_IFACE
, "BSSExpireCount", dbus
.UInt32(2),
2478 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2480 def test_dbus_country(dev
, apdev
):
2481 """D-Bus Get/Set Country"""
2483 _test_dbus_country(dev
, apdev
)
2485 dev
[0].request("SET country 00")
2486 subprocess
.call(['iw', 'reg', 'set', '00'])
2488 def _test_dbus_country(dev
, apdev
):
2489 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
2491 # work around issues with possible pending regdom event from the end of
2492 # the previous test case
2494 dev
[0].dump_monitor()
2496 if_obj
.Set(WPAS_DBUS_IFACE
, "Country", "FI",
2497 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2498 res
= if_obj
.Get(WPAS_DBUS_IFACE
, "Country",
2499 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2501 raise Exception("Unexpected Country value %s (expected FI)" % res
)
2503 ev
= dev
[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"])
2505 # For now, work around separate P2P Device interface event delivery
2506 ev
= dev
[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout
=1)
2508 raise Exception("regdom change event not seen")
2509 if "init=USER type=COUNTRY alpha2=FI" not in ev
:
2510 raise Exception("Unexpected event contents: " + ev
)
2513 if_obj
.Set(WPAS_DBUS_IFACE
, "Country", dbus
.Int16(-1),
2514 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2515 raise Exception("Invalid Set(Country,-1) accepted")
2516 except dbus
.exceptions
.DBusException
, e
:
2517 if "Error.Failed: wrong property type" not in str(e
):
2518 raise Exception("Unexpected error message for invalid Set(Country,-1): " + str(e
))
2521 if_obj
.Set(WPAS_DBUS_IFACE
, "Country", "F",
2522 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2523 raise Exception("Invalid Set(Country,F) accepted")
2524 except dbus
.exceptions
.DBusException
, e
:
2525 if "Error.Failed: invalid country code" not in str(e
):
2526 raise Exception("Unexpected error message for invalid Set(Country,F): " + str(e
))
2528 if_obj
.Set(WPAS_DBUS_IFACE
, "Country", "00",
2529 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2531 ev
= dev
[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"])
2533 # For now, work around separate P2P Device interface event delivery
2534 ev
= dev
[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout
=1)
2536 raise Exception("regdom change event not seen")
2537 # init=CORE was previously used due to invalid db.txt data for 00. For
2538 # now, allow both it and the new init=USER after fixed db.txt.
2539 if "init=CORE type=WORLD" not in ev
and "init=USER type=WORLD" not in ev
:
2540 raise Exception("Unexpected event contents: " + ev
)
2542 def test_dbus_scan_interval(dev
, apdev
):
2543 """D-Bus Get/Set ScanInterval"""
2545 _test_dbus_scan_interval(dev
, apdev
)
2547 dev
[0].request("SCAN_INTERVAL 5")
2549 def _test_dbus_scan_interval(dev
, apdev
):
2550 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
2552 if_obj
.Set(WPAS_DBUS_IFACE
, "ScanInterval", dbus
.Int32(3),
2553 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2554 res
= if_obj
.Get(WPAS_DBUS_IFACE
, "ScanInterval",
2555 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2557 raise Exception("Unexpected ScanInterval value %d (expected %d)" % (res
, i
))
2560 if_obj
.Set(WPAS_DBUS_IFACE
, "ScanInterval", dbus
.UInt16(100),
2561 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2562 raise Exception("Invalid Set(ScanInterval,100) accepted")
2563 except dbus
.exceptions
.DBusException
, e
:
2564 if "Error.Failed: wrong property type" not in str(e
):
2565 raise Exception("Unexpected error message for invalid Set(ScanInterval,100): " + str(e
))
2568 if_obj
.Set(WPAS_DBUS_IFACE
, "ScanInterval", dbus
.Int32(-1),
2569 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2570 raise Exception("Invalid Set(ScanInterval,-1) accepted")
2571 except dbus
.exceptions
.DBusException
, e
:
2572 if "Error.Failed: scan_interval must be >= 0" not in str(e
):
2573 raise Exception("Unexpected error message for invalid Set(ScanInterval,-1): " + str(e
))
2575 if_obj
.Set(WPAS_DBUS_IFACE
, "ScanInterval", dbus
.Int32(5),
2576 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2578 def test_dbus_probe_req_reporting(dev
, apdev
):
2579 """D-Bus Probe Request reporting"""
2580 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
2582 dev
[1].p2p_find(social
=True)
2584 class TestDbusProbe(TestDbus
):
2585 def __init__(self
, bus
):
2586 TestDbus
.__init
__(self
, bus
)
2587 self
.reported
= False
2589 def __enter__(self
):
2590 gobject
.timeout_add(1, self
.run_test
)
2591 gobject
.timeout_add(15000, self
.timeout
)
2592 self
.add_signal(self
.groupStarted
, WPAS_DBUS_IFACE_P2PDEVICE
,
2594 self
.add_signal(self
.probeRequest
, WPAS_DBUS_IFACE
, "ProbeRequest",
2599 def groupStarted(self
, properties
):
2600 logger
.debug("groupStarted: " + str(properties
))
2601 g_if_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
2602 properties
['interface_object'])
2603 self
.iface
= dbus
.Interface(g_if_obj
, WPAS_DBUS_IFACE
)
2604 self
.iface
.SubscribeProbeReq()
2605 self
.group_p2p
= dbus
.Interface(g_if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
2607 def probeRequest(self
, args
):
2608 logger
.debug("probeRequest: args=%s" % str(args
))
2609 self
.reported
= True
2612 def run_test(self
, *args
):
2613 logger
.debug("run_test")
2614 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
2615 params
= dbus
.Dictionary({ 'frequency': 2412 })
2616 p2p
.GroupAdd(params
)
2620 return self
.reported
2622 with
TestDbusProbe(bus
) as t
:
2624 raise Exception("Expected signals not seen")
2625 t
.iface
.UnsubscribeProbeReq()
2627 t
.iface
.UnsubscribeProbeReq()
2628 raise Exception("Invalid UnsubscribeProbeReq() accepted")
2629 except dbus
.exceptions
.DBusException
, e
:
2630 if "NoSubscription" not in str(e
):
2631 raise Exception("Unexpected error message for invalid UnsubscribeProbeReq(): " + str(e
))
2632 t
.group_p2p
.Disconnect()
2634 with
TestDbusProbe(bus
) as t
:
2636 raise Exception("Expected signals not seen")
2637 # On purpose, leave ProbeReq subscription in place to test automatic
2640 dev
[1].p2p_stop_find()
2642 def test_dbus_probe_req_reporting_oom(dev
, apdev
):
2643 """D-Bus Probe Request reporting (OOM)"""
2644 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
2645 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
2647 # Need to make sure this process has not already subscribed to avoid false
2648 # failures due to the operation succeeding due to os_strdup() not even
2651 iface
.UnsubscribeProbeReq()
2652 was_subscribed
= True
2653 except dbus
.exceptions
.DBusException
, e
:
2654 was_subscribed
= False
2657 with
alloc_fail_dbus(dev
[0], 1, "wpas_dbus_handler_subscribe_preq",
2658 "SubscribeProbeReq"):
2659 iface
.SubscribeProbeReq()
2662 # On purpose, leave ProbeReq subscription in place to test automatic
2664 iface
.SubscribeProbeReq()
2666 def test_dbus_p2p_invalid(dev
, apdev
):
2667 """D-Bus invalid P2P operations"""
2668 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
2669 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
2672 p2p
.RejectPeer(path
+ "/Peers/00112233445566")
2673 raise Exception("Invalid RejectPeer accepted")
2674 except dbus
.exceptions
.DBusException
, e
:
2675 if "UnknownError: Failed to call wpas_p2p_reject" not in str(e
):
2676 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e
))
2679 p2p
.RejectPeer("/foo")
2680 raise Exception("Invalid RejectPeer accepted")
2681 except dbus
.exceptions
.DBusException
, e
:
2682 if "InvalidArgs" not in str(e
):
2683 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e
))
2693 raise Exception("Invalid RemoveClient accepted")
2694 except dbus
.exceptions
.DBusException
, e
:
2695 if "InvalidArgs" not in str(e
):
2696 raise Exception("Unexpected error message for invalid RemoveClient(): " + str(e
))
2698 tests
= [ {'DiscoveryType': 'foo'},
2699 {'RequestedDeviceTypes': 'foo'},
2700 {'RequestedDeviceTypes': ['foo']},
2701 {'RequestedDeviceTypes': ['1','2','3','4','5','6','7','8','9',
2702 '10','11','12','13','14','15','16',
2704 {'RequestedDeviceTypes': dbus
.Array([], signature
="s")},
2705 {'RequestedDeviceTypes': dbus
.Array([['foo']], signature
="as")},
2706 {'RequestedDeviceTypes': dbus
.Array([], signature
="i")},
2707 {'RequestedDeviceTypes': [dbus
.ByteArray('12345678'),
2708 dbus
.ByteArray('1234567')]},
2709 {'Foo': dbus
.Int16(1)},
2710 {'Foo': dbus
.UInt16(1)},
2711 {'Foo': dbus
.Int64(1)},
2712 {'Foo': dbus
.UInt64(1)},
2713 {'Foo': dbus
.Double(1.23)},
2714 {'Foo': dbus
.Signature('s')},
2718 p2p
.Find(dbus
.Dictionary(t
))
2719 raise Exception("Invalid Find accepted")
2720 except dbus
.exceptions
.DBusException
, e
:
2721 if "InvalidArgs" not in str(e
):
2722 raise Exception("Unexpected error message for invalid Find(): " + str(e
))
2725 "/fi/w1/wpa_supplicant1/Interfaces/1234",
2726 "/fi/w1/wpa_supplicant1/Interfaces/1234/Networks/1234" ]:
2728 p2p
.RemovePersistentGroup(dbus
.ObjectPath(p
))
2729 raise Exception("Invalid RemovePersistentGroup accepted")
2730 except dbus
.exceptions
.DBusException
, e
:
2731 if "InvalidArgs" not in str(e
):
2732 raise Exception("Unexpected error message for invalid RemovePersistentGroup: " + str(e
))
2735 dev
[0].request("P2P_SET disabled 1")
2737 raise Exception("Invalid Listen accepted")
2738 except dbus
.exceptions
.DBusException
, e
:
2739 if "UnknownError: Could not start P2P listen" not in str(e
):
2740 raise Exception("Unexpected error message for invalid Listen: " + str(e
))
2742 dev
[0].request("P2P_SET disabled 0")
2744 test_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, path
, introspect
=False)
2745 test_p2p
= dbus
.Interface(test_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
2747 test_p2p
.Listen("foo")
2748 raise Exception("Invalid Listen accepted")
2749 except dbus
.exceptions
.DBusException
, e
:
2750 if "InvalidArgs" not in str(e
):
2751 raise Exception("Unexpected error message for invalid Listen: " + str(e
))
2754 dev
[0].request("P2P_SET disabled 1")
2755 p2p
.ExtendedListen(dbus
.Dictionary({}))
2756 raise Exception("Invalid ExtendedListen accepted")
2757 except dbus
.exceptions
.DBusException
, e
:
2758 if "UnknownError: failed to initiate a p2p_ext_listen" not in str(e
):
2759 raise Exception("Unexpected error message for invalid ExtendedListen: " + str(e
))
2761 dev
[0].request("P2P_SET disabled 0")
2764 dev
[0].request("P2P_SET disabled 1")
2765 args
= { 'duration1': 30000, 'interval1': 102400,
2766 'duration2': 20000, 'interval2': 102400 }
2767 p2p
.PresenceRequest(args
)
2768 raise Exception("Invalid PresenceRequest accepted")
2769 except dbus
.exceptions
.DBusException
, e
:
2770 if "UnknownError: Failed to invoke presence request" not in str(e
):
2771 raise Exception("Unexpected error message for invalid PresenceRequest: " + str(e
))
2773 dev
[0].request("P2P_SET disabled 0")
2776 params
= dbus
.Dictionary({'frequency': dbus
.Int32(-1)})
2777 p2p
.GroupAdd(params
)
2778 raise Exception("Invalid GroupAdd accepted")
2779 except dbus
.exceptions
.DBusException
, e
:
2780 if "InvalidArgs" not in str(e
):
2781 raise Exception("Unexpected error message for invalid GroupAdd: " + str(e
))
2784 params
= dbus
.Dictionary({'persistent_group_object':
2785 dbus
.ObjectPath(path
),
2787 p2p
.GroupAdd(params
)
2788 raise Exception("Invalid GroupAdd accepted")
2789 except dbus
.exceptions
.DBusException
, e
:
2790 if "InvalidArgs" not in str(e
):
2791 raise Exception("Unexpected error message for invalid GroupAdd: " + str(e
))
2795 raise Exception("Invalid Disconnect accepted")
2796 except dbus
.exceptions
.DBusException
, e
:
2797 if "UnknownError: failed to disconnect" not in str(e
):
2798 raise Exception("Unexpected error message for invalid Disconnect: " + str(e
))
2801 dev
[0].request("P2P_SET disabled 1")
2803 raise Exception("Invalid Flush accepted")
2804 except dbus
.exceptions
.DBusException
, e
:
2805 if "Error.Failed: P2P is not available for this interface" not in str(e
):
2806 raise Exception("Unexpected error message for invalid Flush: " + str(e
))
2808 dev
[0].request("P2P_SET disabled 0")
2811 dev
[0].request("P2P_SET disabled 1")
2812 args
= { 'peer': path
,
2814 'wps_method': 'pbc',
2816 pin
= p2p
.Connect(args
)
2817 raise Exception("Invalid Connect accepted")
2818 except dbus
.exceptions
.DBusException
, e
:
2819 if "Error.Failed: P2P is not available for this interface" not in str(e
):
2820 raise Exception("Unexpected error message for invalid Connect: " + str(e
))
2822 dev
[0].request("P2P_SET disabled 0")
2824 tests
= [ { 'frequency': dbus
.Int32(-1) },
2825 { 'wps_method': 'pbc' },
2826 { 'wps_method': 'foo' } ]
2829 pin
= p2p
.Connect(args
)
2830 raise Exception("Invalid Connect accepted")
2831 except dbus
.exceptions
.DBusException
, e
:
2832 if "InvalidArgs" not in str(e
):
2833 raise Exception("Unexpected error message for invalid Connect: " + str(e
))
2836 dev
[0].request("P2P_SET disabled 1")
2837 args
= { 'peer': path
}
2838 pin
= p2p
.Invite(args
)
2839 raise Exception("Invalid Invite accepted")
2840 except dbus
.exceptions
.DBusException
, e
:
2841 if "Error.Failed: P2P is not available for this interface" not in str(e
):
2842 raise Exception("Unexpected error message for invalid Invite: " + str(e
))
2844 dev
[0].request("P2P_SET disabled 0")
2847 args
= { 'foo': 'bar' }
2848 pin
= p2p
.Invite(args
)
2849 raise Exception("Invalid Invite accepted")
2850 except dbus
.exceptions
.DBusException
, e
:
2851 if "InvalidArgs" not in str(e
):
2852 raise Exception("Unexpected error message for invalid Connect: " + str(e
))
2854 tests
= [ (path
, 'display', "InvalidArgs"),
2855 (dbus
.ObjectPath(path
+ "/Peers/00112233445566"),
2857 "UnknownError: Failed to send provision discovery request"),
2858 (dbus
.ObjectPath(path
+ "/Peers/00112233445566"),
2860 "UnknownError: Failed to send provision discovery request"),
2861 (dbus
.ObjectPath(path
+ "/Peers/00112233445566"),
2863 "UnknownError: Failed to send provision discovery request"),
2864 (dbus
.ObjectPath(path
+ "/Peers/00112233445566"),
2866 "UnknownError: Failed to send provision discovery request"),
2867 (dbus
.ObjectPath(path
+ "/Peers/00112233445566"),
2868 'foo', "InvalidArgs") ]
2869 for (p
,method
,err
) in tests
:
2871 p2p
.ProvisionDiscoveryRequest(p
, method
)
2872 raise Exception("Invalid ProvisionDiscoveryRequest accepted")
2873 except dbus
.exceptions
.DBusException
, e
:
2874 if err
not in str(e
):
2875 raise Exception("Unexpected error message for invalid ProvisionDiscoveryRequest: " + str(e
))
2878 dev
[0].request("P2P_SET disabled 1")
2879 if_obj
.Get(WPAS_DBUS_IFACE_P2PDEVICE
, "Peers",
2880 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2881 raise Exception("Invalid Get(Peers) accepted")
2882 except dbus
.exceptions
.DBusException
, e
:
2883 if "Error.Failed: P2P is not available for this interface" not in str(e
):
2884 raise Exception("Unexpected error message for invalid Get(Peers): " + str(e
))
2886 dev
[0].request("P2P_SET disabled 0")
2888 def test_dbus_p2p_oom(dev
, apdev
):
2889 """D-Bus P2P operations and OOM"""
2890 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
2891 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
2893 with
alloc_fail_dbus(dev
[0], 1, "_wpa_dbus_dict_entry_get_string_array",
2894 "Find", "InvalidArgs"):
2895 p2p
.Find(dbus
.Dictionary({ 'Foo': [ 'bar' ] }))
2897 with
alloc_fail_dbus(dev
[0], 2, "_wpa_dbus_dict_entry_get_string_array",
2898 "Find", "InvalidArgs"):
2899 p2p
.Find(dbus
.Dictionary({ 'Foo': [ 'bar' ] }))
2901 with
alloc_fail_dbus(dev
[0], 10, "_wpa_dbus_dict_entry_get_string_array",
2902 "Find", "InvalidArgs"):
2903 p2p
.Find(dbus
.Dictionary({ 'Foo': [ '1','2','3','4','5','6','7','8','9' ] }))
2905 with
alloc_fail_dbus(dev
[0], 1, ":=_wpa_dbus_dict_entry_get_binarray",
2906 "Find", "InvalidArgs"):
2907 p2p
.Find(dbus
.Dictionary({ 'Foo': [ dbus
.ByteArray('123') ] }))
2909 with
alloc_fail_dbus(dev
[0], 1, "_wpa_dbus_dict_entry_get_byte_array;_wpa_dbus_dict_entry_get_binarray",
2910 "Find", "InvalidArgs"):
2911 p2p
.Find(dbus
.Dictionary({ 'Foo': [ dbus
.ByteArray('123') ] }))
2913 with
alloc_fail_dbus(dev
[0], 2, "=_wpa_dbus_dict_entry_get_binarray",
2914 "Find", "InvalidArgs"):
2915 p2p
.Find(dbus
.Dictionary({ 'Foo': [ dbus
.ByteArray('123'),
2916 dbus
.ByteArray('123'),
2917 dbus
.ByteArray('123'),
2918 dbus
.ByteArray('123'),
2919 dbus
.ByteArray('123'),
2920 dbus
.ByteArray('123'),
2921 dbus
.ByteArray('123'),
2922 dbus
.ByteArray('123'),
2923 dbus
.ByteArray('123'),
2924 dbus
.ByteArray('123'),
2925 dbus
.ByteArray('123') ] }))
2927 with
alloc_fail_dbus(dev
[0], 1, "wpabuf_alloc_ext_data;_wpa_dbus_dict_entry_get_binarray",
2928 "Find", "InvalidArgs"):
2929 p2p
.Find(dbus
.Dictionary({ 'Foo': [ dbus
.ByteArray('123') ] }))
2931 with
alloc_fail_dbus(dev
[0], 1, "_wpa_dbus_dict_fill_value_from_variant;wpas_dbus_handler_p2p_find",
2932 "Find", "InvalidArgs"):
2933 p2p
.Find(dbus
.Dictionary({ 'Foo': path
}))
2935 with
alloc_fail_dbus(dev
[0], 1, "_wpa_dbus_dict_entry_get_byte_array",
2936 "AddService", "InvalidArgs"):
2937 args
= { 'service_type': 'bonjour',
2938 'response': dbus
.ByteArray(500*'b') }
2939 p2p
.AddService(args
)
2941 with
alloc_fail_dbus(dev
[0], 2, "_wpa_dbus_dict_entry_get_byte_array",
2942 "AddService", "InvalidArgs"):
2943 p2p
.AddService(args
)
2945 def test_dbus_p2p_discovery(dev
, apdev
):
2946 """D-Bus P2P discovery"""
2947 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
2948 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
2950 addr0
= dev
[0].p2p_dev_addr()
2952 dev
[1].request("SET sec_device_type 1-0050F204-2")
2953 dev
[1].request("VENDOR_ELEM_ADD 1 dd0c0050f2041049000411223344")
2955 addr1
= dev
[1].p2p_dev_addr()
2956 a1
= binascii
.unhexlify(addr1
.replace(':',''))
2958 wfd_devinfo
= "00001c440028"
2959 dev
[2].request("SET wifi_display 1")
2960 dev
[2].request("WFD_SUBELEM_SET 0 0006" + wfd_devinfo
)
2961 wfd
= binascii
.unhexlify('000006' + wfd_devinfo
)
2963 addr2
= dev
[2].p2p_dev_addr()
2964 a2
= binascii
.unhexlify(addr2
.replace(':',''))
2966 res
= if_obj
.GetAll(WPAS_DBUS_IFACE_P2PDEVICE
,
2967 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2968 if 'Peers' not in res
:
2969 raise Exception("GetAll result missing Peers")
2970 if len(res
['Peers']) != 0:
2971 raise Exception("Unexpected peer(s) in the list")
2973 args
= {'DiscoveryType': 'social',
2974 'RequestedDeviceTypes': [dbus
.ByteArray('12345678')],
2975 'Timeout': dbus
.Int32(1) }
2976 p2p
.Find(dbus
.Dictionary(args
))
2979 class TestDbusP2p(TestDbus
):
2980 def __init__(self
, bus
):
2981 TestDbus
.__init
__(self
, bus
)
2984 self
.found_prop
= False
2986 self
.find_stopped
= False
2988 def __enter__(self
):
2989 gobject
.timeout_add(1, self
.run_test
)
2990 gobject
.timeout_add(15000, self
.timeout
)
2991 self
.add_signal(self
.deviceFound
, WPAS_DBUS_IFACE_P2PDEVICE
,
2993 self
.add_signal(self
.deviceFoundProperties
,
2994 WPAS_DBUS_IFACE_P2PDEVICE
, "DeviceFoundProperties")
2995 self
.add_signal(self
.deviceLost
, WPAS_DBUS_IFACE_P2PDEVICE
,
2997 self
.add_signal(self
.provisionDiscoveryResponseEnterPin
,
2998 WPAS_DBUS_IFACE_P2PDEVICE
,
2999 "ProvisionDiscoveryResponseEnterPin")
3000 self
.add_signal(self
.findStopped
, WPAS_DBUS_IFACE_P2PDEVICE
,
3005 def deviceFound(self
, path
):
3006 logger
.debug("deviceFound: path=%s" % path
)
3007 res
= if_obj
.Get(WPAS_DBUS_IFACE_P2PDEVICE
, "Peers",
3008 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3010 raise Exception("Unexpected number of peers")
3012 raise Exception("Mismatch in peer object path")
3013 peer_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, path
)
3014 res
= peer_obj
.GetAll(WPAS_DBUS_P2P_PEER
,
3015 dbus_interface
=dbus
.PROPERTIES_IFACE
,
3017 logger
.debug("peer properties: " + str(res
))
3019 if res
['DeviceAddress'] == a1
:
3020 if 'SecondaryDeviceTypes' not in res
:
3021 raise Exception("Missing SecondaryDeviceTypes")
3022 sec
= res
['SecondaryDeviceTypes']
3024 raise Exception("Secondary device type missing")
3025 if "\x00\x01\x00\x50\xF2\x04\x00\x02" not in sec
:
3026 raise Exception("Secondary device type mismatch")
3028 if 'VendorExtension' not in res
:
3029 raise Exception("Missing VendorExtension")
3030 vendor
= res
['VendorExtension']
3032 raise Exception("Vendor extension missing")
3033 if "\x11\x22\x33\x44" not in vendor
:
3034 raise Exception("Secondary device type mismatch")
3037 elif res
['DeviceAddress'] == a2
:
3038 if 'IEs' not in res
:
3039 raise Exception("IEs missing")
3040 if res
['IEs'] != wfd
:
3041 raise Exception("IEs mismatch")
3044 raise Exception("Unexpected peer device address")
3046 if self
.found
and self
.found2
:
3048 p2p
.RejectPeer(path
)
3049 p2p
.ProvisionDiscoveryRequest(path
, 'display')
3051 def deviceLost(self
, path
):
3052 logger
.debug("deviceLost: path=%s" % path
)
3053 if not self
.found
or not self
.found2
:
3054 # This may happen if a previous test case ended up scheduling
3055 # deviceLost event and that event did not get delivered before
3056 # starting the next test execution.
3057 logger
.debug("Ignore deviceLost before the deviceFound events")
3061 p2p
.RejectPeer(path
)
3062 raise Exception("Invalid RejectPeer accepted")
3063 except dbus
.exceptions
.DBusException
, e
:
3064 if "UnknownError: Failed to call wpas_p2p_reject" not in str(e
):
3065 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e
))
3068 def deviceFoundProperties(self
, path
, properties
):
3069 logger
.debug("deviceFoundProperties: path=%s" % path
)
3070 logger
.debug("peer properties: " + str(properties
))
3071 if properties
['DeviceAddress'] == a1
:
3072 self
.found_prop
= True
3074 def provisionDiscoveryResponseEnterPin(self
, peer_object
):
3075 logger
.debug("provisionDiscoveryResponseEnterPin - peer=%s" % peer_object
)
3078 def findStopped(self
):
3079 logger
.debug("findStopped")
3080 self
.find_stopped
= True
3082 def run_test(self
, *args
):
3083 logger
.debug("run_test")
3084 p2p
.Find(dbus
.Dictionary({'DiscoveryType': 'social',
3085 'Timeout': dbus
.Int32(10)}))
3089 return self
.found
and self
.lost
and self
.found2
and self
.find_stopped
3091 with
TestDbusP2p(bus
) as t
:
3093 raise Exception("Expected signals not seen")
3095 dev
[1].request("VENDOR_ELEM_REMOVE 1 *")
3096 dev
[1].p2p_stop_find()
3099 dev
[2].p2p_stop_find()
3100 dev
[2].request("P2P_FLUSH")
3101 if not dev
[2].discover_peer(addr0
):
3102 raise Exception("Peer not found")
3104 dev
[2].p2p_stop_find()
3107 p2p
.ExtendedListen(dbus
.Dictionary({'foo': 100}))
3108 raise Exception("Invalid ExtendedListen accepted")
3109 except dbus
.exceptions
.DBusException
, e
:
3110 if "InvalidArgs" not in str(e
):
3111 raise Exception("Unexpected error message for invalid ExtendedListen(): " + str(e
))
3113 p2p
.ExtendedListen(dbus
.Dictionary({'period': 100, 'interval': 1000}))
3114 p2p
.ExtendedListen(dbus
.Dictionary({}))
3115 dev
[0].global_request("P2P_EXT_LISTEN")
3117 def test_dbus_p2p_discovery_freq(dev
, apdev
):
3118 """D-Bus P2P discovery on a specific non-social channel"""
3119 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
3120 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
3122 addr1
= dev
[1].p2p_dev_addr()
3123 autogo(dev
[1], freq
=2422)
3125 class TestDbusP2p(TestDbus
):
3126 def __init__(self
, bus
):
3127 TestDbus
.__init
__(self
, bus
)
3130 def __enter__(self
):
3131 gobject
.timeout_add(1, self
.run_test
)
3132 gobject
.timeout_add(5000, self
.timeout
)
3133 self
.add_signal(self
.deviceFound
, WPAS_DBUS_IFACE_P2PDEVICE
,
3138 def deviceFound(self
, path
):
3139 logger
.debug("deviceFound: path=%s" % path
)
3143 def run_test(self
, *args
):
3144 logger
.debug("run_test")
3145 p2p
.Find(dbus
.Dictionary({'freq': 2422}))
3151 with
TestDbusP2p(bus
) as t
:
3153 raise Exception("Expected signals not seen")
3155 dev
[1].remove_group()
3158 def test_dbus_p2p_service_discovery(dev
, apdev
):
3159 """D-Bus P2P service discovery"""
3160 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
3161 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
3163 addr0
= dev
[0].p2p_dev_addr()
3164 addr1
= dev
[1].p2p_dev_addr()
3166 bonjour_query
= dbus
.ByteArray(binascii
.unhexlify('0b5f6166706f766572746370c00c000c01'))
3167 bonjour_response
= dbus
.ByteArray(binascii
.unhexlify('074578616d706c65c027'))
3169 args
= { 'service_type': 'bonjour',
3170 'query': bonjour_query
,
3171 'response': bonjour_response
}
3172 p2p
.AddService(args
)
3174 p2p
.AddService(args
)
3177 p2p
.DeleteService(args
)
3178 raise Exception("Invalid DeleteService() accepted")
3179 except dbus
.exceptions
.DBusException
, e
:
3180 if "InvalidArgs" not in str(e
):
3181 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e
))
3183 args
= { 'service_type': 'bonjour',
3184 'query': bonjour_query
}
3185 p2p
.DeleteService(args
)
3187 p2p
.DeleteService(args
)
3188 raise Exception("Invalid DeleteService() accepted")
3189 except dbus
.exceptions
.DBusException
, e
:
3190 if "InvalidArgs" not in str(e
):
3191 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e
))
3193 args
= { 'service_type': 'upnp',
3195 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' }
3196 p2p
.AddService(args
)
3197 p2p
.DeleteService(args
)
3199 p2p
.DeleteService(args
)
3200 raise Exception("Invalid DeleteService() accepted")
3201 except dbus
.exceptions
.DBusException
, e
:
3202 if "InvalidArgs" not in str(e
):
3203 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e
))
3205 tests
= [ { 'service_type': 'foo' },
3206 { 'service_type': 'foo', 'query': bonjour_query
},
3207 { 'service_type': 'upnp' },
3208 { 'service_type': 'upnp', 'version': 0x10 },
3209 { 'service_type': 'upnp',
3210 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
3212 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
3213 { 'service_type': 'upnp', 'foo': 'bar' },
3214 { 'service_type': 'bonjour' },
3215 { 'service_type': 'bonjour', 'query': 'foo' },
3216 { 'service_type': 'bonjour', 'foo': 'bar' } ]
3219 p2p
.DeleteService(args
)
3220 raise Exception("Invalid DeleteService() accepted")
3221 except dbus
.exceptions
.DBusException
, e
:
3222 if "InvalidArgs" not in str(e
):
3223 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e
))
3225 tests
= [ { 'service_type': 'foo' },
3226 { 'service_type': 'upnp' },
3227 { 'service_type': 'upnp', 'version': 0x10 },
3228 { 'service_type': 'upnp',
3229 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
3231 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
3232 { 'service_type': 'upnp', 'foo': 'bar' },
3233 { 'service_type': 'bonjour' },
3234 { 'service_type': 'bonjour', 'query': 'foo' },
3235 { 'service_type': 'bonjour', 'response': 'foo' },
3236 { 'service_type': 'bonjour', 'query': bonjour_query
},
3237 { 'service_type': 'bonjour', 'response': bonjour_response
},
3238 { 'service_type': 'bonjour', 'query': dbus
.ByteArray(500*'a') },
3239 { 'service_type': 'bonjour', 'foo': 'bar' } ]
3242 p2p
.AddService(args
)
3243 raise Exception("Invalid AddService() accepted")
3244 except dbus
.exceptions
.DBusException
, e
:
3245 if "InvalidArgs" not in str(e
):
3246 raise Exception("Unexpected error message for invalid AddService(): " + str(e
))
3248 args
= { 'tlv': dbus
.ByteArray("\x02\x00\x00\x01") }
3249 ref
= p2p
.ServiceDiscoveryRequest(args
)
3250 p2p
.ServiceDiscoveryCancelRequest(ref
)
3252 p2p
.ServiceDiscoveryCancelRequest(ref
)
3253 raise Exception("Invalid ServiceDiscoveryCancelRequest() accepted")
3254 except dbus
.exceptions
.DBusException
, e
:
3255 if "InvalidArgs" not in str(e
):
3256 raise Exception("Unexpected error message for invalid AddService(): " + str(e
))
3258 p2p
.ServiceDiscoveryCancelRequest(dbus
.UInt64(0))
3259 raise Exception("Invalid ServiceDiscoveryCancelRequest() accepted")
3260 except dbus
.exceptions
.DBusException
, e
:
3261 if "InvalidArgs" not in str(e
):
3262 raise Exception("Unexpected error message for invalid AddService(): " + str(e
))
3264 args
= { 'service_type': 'upnp',
3266 'service': 'ssdp:foo' }
3267 ref
= p2p
.ServiceDiscoveryRequest(args
)
3268 p2p
.ServiceDiscoveryCancelRequest(ref
)
3270 tests
= [ { 'service_type': 'foo' },
3275 { 'service_type': 'upnp',
3276 'service': 'ssdp:foo' },
3277 { 'service_type': 'upnp',
3279 { 'service_type': 'upnp',
3281 'service': 'ssdp:foo',
3282 'peer_object': dbus
.ObjectPath(path
+ "/Peers") },
3283 { 'service_type': 'upnp',
3285 'service': 'ssdp:foo',
3286 'peer_object': path
+ "/Peers" },
3287 { 'service_type': 'upnp',
3289 'service': 'ssdp:foo',
3290 'peer_object': dbus
.ObjectPath(path
+ "/Peers/00112233445566") } ]
3293 p2p
.ServiceDiscoveryRequest(args
)
3294 raise Exception("Invalid ServiceDiscoveryRequest accepted")
3295 except dbus
.exceptions
.DBusException
, e
:
3296 if "InvalidArgs" not in str(e
):
3297 raise Exception("Unexpected error message for invalid ServiceDiscoveryRequest(): " + str(e
))
3299 args
= { 'foo': 'bar' }
3301 p2p
.ServiceDiscoveryResponse(dbus
.Dictionary(args
, signature
='sv'))
3302 raise Exception("Invalid ServiceDiscoveryResponse accepted")
3303 except dbus
.exceptions
.DBusException
, e
:
3304 if "InvalidArgs" not in str(e
):
3305 raise Exception("Unexpected error message for invalid ServiceDiscoveryResponse(): " + str(e
))
3307 def test_dbus_p2p_service_discovery_query(dev
, apdev
):
3308 """D-Bus P2P service discovery query"""
3309 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
3310 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
3312 addr0
= dev
[0].p2p_dev_addr()
3313 dev
[1].request("P2P_SERVICE_ADD bonjour 0b5f6166706f766572746370c00c000c01 074578616d706c65c027")
3315 addr1
= dev
[1].p2p_dev_addr()
3317 class TestDbusP2p(TestDbus
):
3318 def __init__(self
, bus
):
3319 TestDbus
.__init
__(self
, bus
)
3322 def __enter__(self
):
3323 gobject
.timeout_add(1, self
.run_test
)
3324 gobject
.timeout_add(15000, self
.timeout
)
3325 self
.add_signal(self
.deviceFound
, WPAS_DBUS_IFACE_P2PDEVICE
,
3327 self
.add_signal(self
.serviceDiscoveryResponse
,
3328 WPAS_DBUS_IFACE_P2PDEVICE
,
3329 "ServiceDiscoveryResponse", byte_arrays
=True)
3333 def deviceFound(self
, path
):
3334 logger
.debug("deviceFound: path=%s" % path
)
3335 args
= { 'peer_object': path
,
3336 'tlv': dbus
.ByteArray("\x02\x00\x00\x01") }
3337 p2p
.ServiceDiscoveryRequest(args
)
3339 def serviceDiscoveryResponse(self
, sd_request
):
3340 logger
.debug("serviceDiscoveryResponse: sd_request=%s" % str(sd_request
))
3344 def run_test(self
, *args
):
3345 logger
.debug("run_test")
3346 p2p
.Find(dbus
.Dictionary({'DiscoveryType': 'social',
3347 'Timeout': dbus
.Int32(10)}))
3353 with
TestDbusP2p(bus
) as t
:
3355 raise Exception("Expected signals not seen")
3357 dev
[1].p2p_stop_find()
3359 def test_dbus_p2p_service_discovery_external(dev
, apdev
):
3360 """D-Bus P2P service discovery with external response"""
3362 _test_dbus_p2p_service_discovery_external(dev
, apdev
)
3364 dev
[0].request("P2P_SERV_DISC_EXTERNAL 0")
3366 def _test_dbus_p2p_service_discovery_external(dev
, apdev
):
3367 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
3368 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
3370 addr0
= dev
[0].p2p_dev_addr()
3371 addr1
= dev
[1].p2p_dev_addr()
3374 dev
[1].request("P2P_FLUSH")
3375 dev
[1].request("P2P_SERV_DISC_REQ " + addr0
+ " 02000001")
3376 dev
[1].p2p_find(social
=True)
3378 class TestDbusP2p(TestDbus
):
3379 def __init__(self
, bus
):
3380 TestDbus
.__init
__(self
, bus
)
3383 def __enter__(self
):
3384 gobject
.timeout_add(1, self
.run_test
)
3385 gobject
.timeout_add(15000, self
.timeout
)
3386 self
.add_signal(self
.deviceFound
, WPAS_DBUS_IFACE_P2PDEVICE
,
3388 self
.add_signal(self
.serviceDiscoveryRequest
,
3389 WPAS_DBUS_IFACE_P2PDEVICE
,
3390 "ServiceDiscoveryRequest")
3394 def deviceFound(self
, path
):
3395 logger
.debug("deviceFound: path=%s" % path
)
3397 def serviceDiscoveryRequest(self
, sd_request
):
3398 logger
.debug("serviceDiscoveryRequest: sd_request=%s" % str(sd_request
))
3400 args
= { 'peer_object': sd_request
['peer_object'],
3401 'frequency': sd_request
['frequency'],
3402 'dialog_token': sd_request
['dialog_token'],
3403 'tlvs': dbus
.ByteArray(binascii
.unhexlify(resp
)) }
3404 p2p
.ServiceDiscoveryResponse(dbus
.Dictionary(args
, signature
='sv'))
3407 def run_test(self
, *args
):
3408 logger
.debug("run_test")
3409 p2p
.ServiceDiscoveryExternal(1)
3417 with
TestDbusP2p(bus
) as t
:
3419 raise Exception("Expected signals not seen")
3421 ev
= dev
[1].wait_global_event(["P2P-SERV-DISC-RESP"], timeout
=5)
3423 raise Exception("Service discovery timed out")
3425 raise Exception("Unexpected address in SD Response: " + ev
)
3426 if ev
.split(' ')[4] != resp
:
3427 raise Exception("Unexpected response data SD Response: " + ev
)
3428 dev
[1].p2p_stop_find()
3431 p2p
.ServiceDiscoveryExternal(0)
3433 def test_dbus_p2p_autogo(dev
, apdev
):
3434 """D-Bus P2P autonomous GO"""
3435 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
3436 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
3438 addr0
= dev
[0].p2p_dev_addr()
3440 class TestDbusP2p(TestDbus
):
3441 def __init__(self
, bus
):
3442 TestDbus
.__init
__(self
, bus
)
3444 self
.waiting_end
= False
3445 self
.exceptions
= False
3446 self
.deauthorized
= False
3449 def __enter__(self
):
3450 gobject
.timeout_add(1, self
.run_test
)
3451 gobject
.timeout_add(15000, self
.timeout
)
3452 self
.add_signal(self
.deviceFound
, WPAS_DBUS_IFACE_P2PDEVICE
,
3454 self
.add_signal(self
.groupStarted
, WPAS_DBUS_IFACE_P2PDEVICE
,
3456 self
.add_signal(self
.groupFinished
, WPAS_DBUS_IFACE_P2PDEVICE
,
3458 self
.add_signal(self
.persistentGroupAdded
,
3459 WPAS_DBUS_IFACE_P2PDEVICE
,
3460 "PersistentGroupAdded")
3461 self
.add_signal(self
.persistentGroupRemoved
,
3462 WPAS_DBUS_IFACE_P2PDEVICE
,
3463 "PersistentGroupRemoved")
3464 self
.add_signal(self
.provisionDiscoveryRequestDisplayPin
,
3465 WPAS_DBUS_IFACE_P2PDEVICE
,
3466 "ProvisionDiscoveryRequestDisplayPin")
3467 self
.add_signal(self
.staAuthorized
, WPAS_DBUS_IFACE
,
3469 self
.add_signal(self
.staDeauthorized
, WPAS_DBUS_IFACE
,
3474 def groupStarted(self
, properties
):
3475 logger
.debug("groupStarted: " + str(properties
))
3476 self
.group
= properties
['group_object']
3477 self
.g_if_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
3478 properties
['interface_object'])
3479 role
= self
.g_if_obj
.Get(WPAS_DBUS_IFACE_P2PDEVICE
, "Role",
3480 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3482 self
.exceptions
= True
3483 raise Exception("Unexpected role reported: " + role
)
3484 group
= self
.g_if_obj
.Get(WPAS_DBUS_IFACE_P2PDEVICE
, "Group",
3485 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3486 if group
!= properties
['group_object']:
3487 self
.exceptions
= True
3488 raise Exception("Unexpected Group reported: " + str(group
))
3489 go
= self
.g_if_obj
.Get(WPAS_DBUS_IFACE_P2PDEVICE
, "PeerGO",
3490 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3492 self
.exceptions
= True
3493 raise Exception("Unexpected PeerGO value: " + str(go
))
3496 logger
.info("Remove persistent group instance")
3497 group_p2p
= dbus
.Interface(self
.g_if_obj
,
3498 WPAS_DBUS_IFACE_P2PDEVICE
)
3499 group_p2p
.Disconnect()
3501 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3502 dev1
.global_request("P2P_CONNECT " + addr0
+ " 12345670 join")
3504 def groupFinished(self
, properties
):
3505 logger
.debug("groupFinished: " + str(properties
))
3506 if self
.waiting_end
:
3507 logger
.info("Remove persistent group")
3508 p2p
.RemovePersistentGroup(self
.persistent
)
3510 logger
.info("Re-start persistent group")
3511 params
= dbus
.Dictionary({'persistent_group_object':
3514 p2p
.GroupAdd(params
)
3516 def persistentGroupAdded(self
, path
, properties
):
3517 logger
.debug("persistentGroupAdded: %s %s" % (path
, str(properties
)))
3518 self
.persistent
= path
3520 def persistentGroupRemoved(self
, path
):
3521 logger
.debug("persistentGroupRemoved: %s" % path
)
3525 def deviceFound(self
, path
):
3526 logger
.debug("deviceFound: path=%s" % path
)
3527 peer_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, path
)
3528 self
.peer
= peer_obj
.GetAll(WPAS_DBUS_P2P_PEER
,
3529 dbus_interface
=dbus
.PROPERTIES_IFACE
,
3531 logger
.debug('peer properties: ' + str(self
.peer
))
3533 def provisionDiscoveryRequestDisplayPin(self
, peer_object
, pin
):
3534 logger
.debug("provisionDiscoveryRequestDisplayPin - peer=%s pin=%s" % (peer_object
, pin
))
3535 self
.peer_path
= peer_object
3536 peer
= binascii
.unhexlify(peer_object
.split('/')[-1])
3541 addr
+= '%02x' % ord(p
)
3543 params
= { 'Role': 'registrar',
3544 'P2PDeviceAddress': self
.peer
['DeviceAddress'],
3545 'Bssid': self
.peer
['DeviceAddress'],
3547 wps
= dbus
.Interface(self
.g_if_obj
, WPAS_DBUS_IFACE_WPS
)
3550 self
.exceptions
= True
3551 raise Exception("Invalid WPS.Start() accepted")
3552 except dbus
.exceptions
.DBusException
, e
:
3553 if "InvalidArgs" not in str(e
):
3554 self
.exceptions
= True
3555 raise Exception("Unexpected error message: " + str(e
))
3556 params
= { 'Role': 'registrar',
3557 'P2PDeviceAddress': self
.peer
['DeviceAddress'],
3560 logger
.info("Authorize peer to connect to the group")
3563 def staAuthorized(self
, name
):
3564 logger
.debug("staAuthorized: " + name
)
3565 peer_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, self
.peer_path
)
3566 res
= peer_obj
.GetAll(WPAS_DBUS_P2P_PEER
,
3567 dbus_interface
=dbus
.PROPERTIES_IFACE
,
3569 logger
.debug("Peer properties: " + str(res
))
3570 if 'Groups' not in res
or len(res
['Groups']) != 1:
3571 self
.exceptions
= True
3572 raise Exception("Unexpected number of peer Groups entries")
3573 if res
['Groups'][0] != self
.group
:
3574 self
.exceptions
= True
3575 raise Exception("Unexpected peer Groups[0] value")
3577 g_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, self
.group
)
3578 res
= g_obj
.GetAll(WPAS_DBUS_GROUP
,
3579 dbus_interface
=dbus
.PROPERTIES_IFACE
,
3581 logger
.debug("Group properties: " + str(res
))
3582 if 'Members' not in res
or len(res
['Members']) != 1:
3583 self
.exceptions
= True
3584 raise Exception("Unexpected number of group members")
3586 ext
= dbus
.ByteArray("\x11\x22\x33\x44")
3587 # Earlier implementation of this interface was a bit strange. The
3588 # property is defined to have aay signature and that is what the
3589 # getter returned. However, the setter expected there to be a
3590 # dictionary with 'WPSVendorExtensions' as the key surrounding these
3591 # values.. The current implementations maintains support for that
3592 # for backwards compability reasons. Verify that encoding first.
3593 vals
= dbus
.Dictionary({ 'WPSVendorExtensions': [ ext
]},
3595 g_obj
.Set(WPAS_DBUS_GROUP
, 'WPSVendorExtensions', vals
,
3596 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3597 res
= g_obj
.Get(WPAS_DBUS_GROUP
, 'WPSVendorExtensions',
3598 dbus_interface
=dbus
.PROPERTIES_IFACE
,
3601 self
.exceptions
= True
3602 raise Exception("Unexpected number of vendor extensions")
3604 self
.exceptions
= True
3605 raise Exception("Vendor extension value changed")
3607 # And now verify that the more appropriate encoding is accepted as
3609 res
.append(dbus
.ByteArray('\xaa\xbb\xcc\xdd\xee\xff'))
3610 g_obj
.Set(WPAS_DBUS_GROUP
, 'WPSVendorExtensions', res
,
3611 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3612 res2
= g_obj
.Get(WPAS_DBUS_GROUP
, 'WPSVendorExtensions',
3613 dbus_interface
=dbus
.PROPERTIES_IFACE
,
3616 self
.exceptions
= True
3617 raise Exception("Unexpected number of vendor extensions")
3618 if res
[0] != res2
[0] or res
[1] != res2
[1]:
3619 self
.exceptions
= True
3620 raise Exception("Vendor extension value changed")
3623 res
.append(dbus
.ByteArray('\xaa\xbb'))
3625 g_obj
.Set(WPAS_DBUS_GROUP
, 'WPSVendorExtensions', res
,
3626 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3627 self
.exceptions
= True
3628 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3629 except dbus
.exceptions
.DBusException
, e
:
3630 if "Error.Failed" not in str(e
):
3631 self
.exceptions
= True
3632 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e
))
3634 vals
= dbus
.Dictionary({ 'Foo': [ ext
]}, signature
='sv')
3636 g_obj
.Set(WPAS_DBUS_GROUP
, 'WPSVendorExtensions', vals
,
3637 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3638 self
.exceptions
= True
3639 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3640 except dbus
.exceptions
.DBusException
, e
:
3641 if "InvalidArgs" not in str(e
):
3642 self
.exceptions
= True
3643 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e
))
3647 g_obj
.Set(WPAS_DBUS_GROUP
, 'WPSVendorExtensions', vals
,
3648 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3649 self
.exceptions
= True
3650 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3651 except dbus
.exceptions
.DBusException
, e
:
3652 if "Error.Failed" not in str(e
):
3653 self
.exceptions
= True
3654 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e
))
3656 vals
= [ [ "foo" ] ]
3658 g_obj
.Set(WPAS_DBUS_GROUP
, 'WPSVendorExtensions', vals
,
3659 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3660 self
.exceptions
= True
3661 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3662 except dbus
.exceptions
.DBusException
, e
:
3663 if "Error.Failed" not in str(e
):
3664 self
.exceptions
= True
3665 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e
))
3667 p2p
.RemoveClient({ 'peer': self
.peer_path
})
3669 self
.waiting_end
= True
3670 group_p2p
= dbus
.Interface(self
.g_if_obj
,
3671 WPAS_DBUS_IFACE_P2PDEVICE
)
3672 group_p2p
.Disconnect()
3674 def staDeauthorized(self
, name
):
3675 logger
.debug("staDeauthorized: " + name
)
3676 self
.deauthorized
= True
3678 def run_test(self
, *args
):
3679 logger
.debug("run_test")
3680 params
= dbus
.Dictionary({'persistent': True,
3682 logger
.info("Add a persistent group")
3683 p2p
.GroupAdd(params
)
3687 return self
.done
and self
.deauthorized
and not self
.exceptions
3689 with
TestDbusP2p(bus
) as t
:
3691 raise Exception("Expected signals not seen")
3693 dev
[1].wait_go_ending_session()
3695 def test_dbus_p2p_autogo_pbc(dev
, apdev
):
3696 """D-Bus P2P autonomous GO and PBC"""
3697 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
3698 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
3700 addr0
= dev
[0].p2p_dev_addr()
3702 class TestDbusP2p(TestDbus
):
3703 def __init__(self
, bus
):
3704 TestDbus
.__init
__(self
, bus
)
3706 self
.waiting_end
= False
3709 def __enter__(self
):
3710 gobject
.timeout_add(1, self
.run_test
)
3711 gobject
.timeout_add(15000, self
.timeout
)
3712 self
.add_signal(self
.deviceFound
, WPAS_DBUS_IFACE_P2PDEVICE
,
3714 self
.add_signal(self
.groupStarted
, WPAS_DBUS_IFACE_P2PDEVICE
,
3716 self
.add_signal(self
.groupFinished
, WPAS_DBUS_IFACE_P2PDEVICE
,
3718 self
.add_signal(self
.provisionDiscoveryPBCRequest
,
3719 WPAS_DBUS_IFACE_P2PDEVICE
,
3720 "ProvisionDiscoveryPBCRequest")
3721 self
.add_signal(self
.staAuthorized
, WPAS_DBUS_IFACE
,
3726 def groupStarted(self
, properties
):
3727 logger
.debug("groupStarted: " + str(properties
))
3728 self
.group
= properties
['group_object']
3729 self
.g_if_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
3730 properties
['interface_object'])
3731 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3732 dev1
.global_request("P2P_CONNECT " + addr0
+ " pbc join")
3734 def groupFinished(self
, properties
):
3735 logger
.debug("groupFinished: " + str(properties
))
3739 def deviceFound(self
, path
):
3740 logger
.debug("deviceFound: path=%s" % path
)
3741 peer_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, path
)
3742 self
.peer
= peer_obj
.GetAll(WPAS_DBUS_P2P_PEER
,
3743 dbus_interface
=dbus
.PROPERTIES_IFACE
,
3745 logger
.debug('peer properties: ' + str(self
.peer
))
3747 def provisionDiscoveryPBCRequest(self
, peer_object
):
3748 logger
.debug("provisionDiscoveryPBCRequest - peer=%s" % peer_object
)
3749 self
.peer_path
= peer_object
3750 peer
= binascii
.unhexlify(peer_object
.split('/')[-1])
3755 addr
+= '%02x' % ord(p
)
3756 params
= { 'Role': 'registrar',
3757 'P2PDeviceAddress': self
.peer
['DeviceAddress'],
3759 logger
.info("Authorize peer to connect to the group")
3760 wps
= dbus
.Interface(self
.g_if_obj
, WPAS_DBUS_IFACE_WPS
)
3763 def staAuthorized(self
, name
):
3764 logger
.debug("staAuthorized: " + name
)
3765 group_p2p
= dbus
.Interface(self
.g_if_obj
,
3766 WPAS_DBUS_IFACE_P2PDEVICE
)
3767 group_p2p
.Disconnect()
3769 def run_test(self
, *args
):
3770 logger
.debug("run_test")
3771 params
= dbus
.Dictionary({'frequency': 2412})
3772 p2p
.GroupAdd(params
)
3778 with
TestDbusP2p(bus
) as t
:
3780 raise Exception("Expected signals not seen")
3782 dev
[1].wait_go_ending_session()
3783 dev
[1].flush_scan_cache()
3785 def test_dbus_p2p_autogo_legacy(dev
, apdev
):
3786 """D-Bus P2P autonomous GO and legacy STA"""
3787 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
3788 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
3790 addr0
= dev
[0].p2p_dev_addr()
3792 class TestDbusP2p(TestDbus
):
3793 def __init__(self
, bus
):
3794 TestDbus
.__init
__(self
, bus
)
3797 def __enter__(self
):
3798 gobject
.timeout_add(1, self
.run_test
)
3799 gobject
.timeout_add(15000, self
.timeout
)
3800 self
.add_signal(self
.groupStarted
, WPAS_DBUS_IFACE_P2PDEVICE
,
3802 self
.add_signal(self
.groupFinished
, WPAS_DBUS_IFACE_P2PDEVICE
,
3804 self
.add_signal(self
.staAuthorized
, WPAS_DBUS_IFACE
,
3809 def groupStarted(self
, properties
):
3810 logger
.debug("groupStarted: " + str(properties
))
3811 g_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
3812 properties
['group_object'])
3813 res
= g_obj
.GetAll(WPAS_DBUS_GROUP
,
3814 dbus_interface
=dbus
.PROPERTIES_IFACE
,
3816 bssid
= ':'.join([binascii
.hexlify(l
) for l
in res
['BSSID']])
3819 params
= { 'Role': 'enrollee',
3822 g_if_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
3823 properties
['interface_object'])
3824 wps
= dbus
.Interface(g_if_obj
, WPAS_DBUS_IFACE_WPS
)
3826 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3827 dev1
.scan_for_bss(bssid
, freq
=2412)
3828 dev1
.request("WPS_PIN " + bssid
+ " " + pin
)
3829 self
.group_p2p
= dbus
.Interface(g_if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
3831 def groupFinished(self
, properties
):
3832 logger
.debug("groupFinished: " + str(properties
))
3836 def staAuthorized(self
, name
):
3837 logger
.debug("staAuthorized: " + name
)
3838 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3839 dev1
.request("DISCONNECT")
3840 self
.group_p2p
.Disconnect()
3842 def run_test(self
, *args
):
3843 logger
.debug("run_test")
3844 params
= dbus
.Dictionary({'frequency': 2412})
3845 p2p
.GroupAdd(params
)
3851 with
TestDbusP2p(bus
) as t
:
3853 raise Exception("Expected signals not seen")
3855 def test_dbus_p2p_join(dev
, apdev
):
3856 """D-Bus P2P join an autonomous GO"""
3857 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
3858 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
3860 addr1
= dev
[1].p2p_dev_addr()
3861 addr2
= dev
[2].p2p_dev_addr()
3862 dev
[1].p2p_start_go(freq
=2412)
3863 dev1_group_ifname
= dev
[1].group_ifname
3866 class TestDbusP2p(TestDbus
):
3867 def __init__(self
, bus
):
3868 TestDbus
.__init
__(self
, bus
)
3873 def __enter__(self
):
3874 gobject
.timeout_add(1, self
.run_test
)
3875 gobject
.timeout_add(15000, self
.timeout
)
3876 self
.add_signal(self
.deviceFound
, WPAS_DBUS_IFACE_P2PDEVICE
,
3878 self
.add_signal(self
.groupStarted
, WPAS_DBUS_IFACE_P2PDEVICE
,
3880 self
.add_signal(self
.groupFinished
, WPAS_DBUS_IFACE_P2PDEVICE
,
3882 self
.add_signal(self
.invitationResult
, WPAS_DBUS_IFACE_P2PDEVICE
,
3887 def deviceFound(self
, path
):
3888 logger
.debug("deviceFound: path=%s" % path
)
3889 peer_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, path
)
3890 res
= peer_obj
.GetAll(WPAS_DBUS_P2P_PEER
,
3891 dbus_interface
=dbus
.PROPERTIES_IFACE
,
3893 logger
.debug('peer properties: ' + str(res
))
3894 if addr2
.replace(':','') in path
:
3896 elif addr1
.replace(':','') in path
:
3898 if self
.peer
and self
.go
:
3899 logger
.info("Join the group")
3901 args
= { 'peer': self
.go
,
3903 'wps_method': 'pin',
3905 pin
= p2p
.Connect(args
)
3907 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3908 dev1
.group_ifname
= dev1_group_ifname
3909 dev1
.group_request("WPS_PIN any " + pin
)
3911 def groupStarted(self
, properties
):
3912 logger
.debug("groupStarted: " + str(properties
))
3913 g_if_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
3914 properties
['interface_object'])
3915 role
= g_if_obj
.Get(WPAS_DBUS_IFACE_P2PDEVICE
, "Role",
3916 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3917 if role
!= "client":
3918 raise Exception("Unexpected role reported: " + role
)
3919 group
= g_if_obj
.Get(WPAS_DBUS_IFACE_P2PDEVICE
, "Group",
3920 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3921 if group
!= properties
['group_object']:
3922 raise Exception("Unexpected Group reported: " + str(group
))
3923 go
= g_if_obj
.Get(WPAS_DBUS_IFACE_P2PDEVICE
, "PeerGO",
3924 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3926 raise Exception("Unexpected PeerGO value: " + str(go
))
3928 g_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
3929 properties
['group_object'])
3930 res
= g_obj
.GetAll(WPAS_DBUS_GROUP
,
3931 dbus_interface
=dbus
.PROPERTIES_IFACE
,
3933 logger
.debug("Group properties: " + str(res
))
3935 ext
= dbus
.ByteArray("\x11\x22\x33\x44")
3937 # Set(WPSVendorExtensions) not allowed for P2P Client
3938 g_obj
.Set(WPAS_DBUS_GROUP
, 'WPSVendorExtensions', res
,
3939 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3940 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3941 except dbus
.exceptions
.DBusException
, e
:
3942 if "Error.Failed: Failed to set property" not in str(e
):
3943 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e
))
3945 group_p2p
= dbus
.Interface(g_if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
3946 args
= { 'duration1': 30000, 'interval1': 102400,
3947 'duration2': 20000, 'interval2': 102400 }
3948 group_p2p
.PresenceRequest(args
)
3950 args
= { 'peer': self
.peer
}
3951 group_p2p
.Invite(args
)
3953 def groupFinished(self
, properties
):
3954 logger
.debug("groupFinished: " + str(properties
))
3958 def invitationResult(self
, result
):
3959 logger
.debug("invitationResult: " + str(result
))
3960 if result
['status'] != 1:
3961 raise Exception("Unexpected invitation result: " + str(result
))
3962 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3963 dev1
.group_ifname
= dev1_group_ifname
3966 def run_test(self
, *args
):
3967 logger
.debug("run_test")
3968 p2p
.Find(dbus
.Dictionary({'DiscoveryType': 'social'}))
3974 with
TestDbusP2p(bus
) as t
:
3976 raise Exception("Expected signals not seen")
3978 dev
[2].p2p_stop_find()
3980 def test_dbus_p2p_invitation_received(dev
, apdev
):
3981 """D-Bus P2P and InvitationReceived"""
3982 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
3983 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
3985 form(dev
[0], dev
[1])
3986 addr0
= dev
[0].p2p_dev_addr()
3988 dev
[0].global_request("SET persistent_reconnect 0")
3990 if not dev
[1].discover_peer(addr0
, social
=True):
3991 raise Exception("Peer " + addr0
+ " not found")
3992 peer
= dev
[1].get_peer(addr0
)
3994 class TestDbusP2p(TestDbus
):
3995 def __init__(self
, bus
):
3996 TestDbus
.__init
__(self
, bus
)
3999 def __enter__(self
):
4000 gobject
.timeout_add(1, self
.run_test
)
4001 gobject
.timeout_add(15000, self
.timeout
)
4002 self
.add_signal(self
.invitationReceived
, WPAS_DBUS_IFACE_P2PDEVICE
,
4003 "InvitationReceived")
4007 def invitationReceived(self
, result
):
4008 logger
.debug("invitationReceived: " + str(result
))
4012 def run_test(self
, *args
):
4013 logger
.debug("run_test")
4014 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4015 cmd
= "P2P_INVITE persistent=" + peer
['persistent'] + " peer=" + addr0
4016 dev1
.global_request(cmd
)
4022 with
TestDbusP2p(bus
) as t
:
4024 raise Exception("Expected signals not seen")
4026 dev
[0].p2p_stop_find()
4027 dev
[1].p2p_stop_find()
4029 def test_dbus_p2p_config(dev
, apdev
):
4030 """D-Bus Get/Set P2PDeviceConfig"""
4032 _test_dbus_p2p_config(dev
, apdev
)
4034 dev
[0].request("P2P_SET ssid_postfix ")
4036 def _test_dbus_p2p_config(dev
, apdev
):
4037 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
4038 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
4040 res
= if_obj
.Get(WPAS_DBUS_IFACE_P2PDEVICE
, "P2PDeviceConfig",
4041 dbus_interface
=dbus
.PROPERTIES_IFACE
,
4043 if_obj
.Set(WPAS_DBUS_IFACE_P2PDEVICE
, "P2PDeviceConfig", res
,
4044 dbus_interface
=dbus
.PROPERTIES_IFACE
)
4045 res2
= if_obj
.Get(WPAS_DBUS_IFACE_P2PDEVICE
, "P2PDeviceConfig",
4046 dbus_interface
=dbus
.PROPERTIES_IFACE
,
4049 if len(res
) != len(res2
):
4050 raise Exception("Different number of parameters")
4052 if res
[k
] != res2
[k
]:
4053 raise Exception("Parameter %s value changes" % k
)
4055 changes
= { 'SsidPostfix': 'foo',
4056 'VendorExtension': [ dbus
.ByteArray('\x11\x22\x33\x44') ],
4057 'SecondaryDeviceTypes': [ dbus
.ByteArray('\x11\x22\x33\x44\x55\x66\x77\x88') ]}
4058 if_obj
.Set(WPAS_DBUS_IFACE_P2PDEVICE
, "P2PDeviceConfig",
4059 dbus
.Dictionary(changes
, signature
='sv'),
4060 dbus_interface
=dbus
.PROPERTIES_IFACE
)
4062 res2
= if_obj
.Get(WPAS_DBUS_IFACE_P2PDEVICE
, "P2PDeviceConfig",
4063 dbus_interface
=dbus
.PROPERTIES_IFACE
,
4065 logger
.debug("P2PDeviceConfig: " + str(res2
))
4066 if 'VendorExtension' not in res2
or len(res2
['VendorExtension']) != 1:
4067 raise Exception("VendorExtension does not match")
4068 if 'SecondaryDeviceTypes' not in res2
or len(res2
['SecondaryDeviceTypes']) != 1:
4069 raise Exception("SecondaryDeviceType does not match")
4071 changes
= { 'SsidPostfix': '',
4072 'VendorExtension': dbus
.Array([], signature
="ay"),
4073 'SecondaryDeviceTypes': dbus
.Array([], signature
="ay") }
4074 if_obj
.Set(WPAS_DBUS_IFACE_P2PDEVICE
, "P2PDeviceConfig",
4075 dbus
.Dictionary(changes
, signature
='sv'),
4076 dbus_interface
=dbus
.PROPERTIES_IFACE
)
4078 res3
= if_obj
.Get(WPAS_DBUS_IFACE_P2PDEVICE
, "P2PDeviceConfig",
4079 dbus_interface
=dbus
.PROPERTIES_IFACE
,
4081 logger
.debug("P2PDeviceConfig: " + str(res3
))
4082 if 'VendorExtension' in res3
:
4083 raise Exception("VendorExtension not removed")
4084 if 'SecondaryDeviceTypes' in res3
:
4085 raise Exception("SecondaryDeviceType not removed")
4088 dev
[0].request("P2P_SET disabled 1")
4089 if_obj
.Get(WPAS_DBUS_IFACE_P2PDEVICE
, "P2PDeviceConfig",
4090 dbus_interface
=dbus
.PROPERTIES_IFACE
,
4092 raise Exception("Invalid Get(P2PDeviceConfig) accepted")
4093 except dbus
.exceptions
.DBusException
, e
:
4094 if "Error.Failed: P2P is not available for this interface" not in str(e
):
4095 raise Exception("Unexpected error message for invalid Invite: " + str(e
))
4097 dev
[0].request("P2P_SET disabled 0")
4100 dev
[0].request("P2P_SET disabled 1")
4101 changes
= { 'SsidPostfix': 'foo' }
4102 if_obj
.Set(WPAS_DBUS_IFACE_P2PDEVICE
, "P2PDeviceConfig",
4103 dbus
.Dictionary(changes
, signature
='sv'),
4104 dbus_interface
=dbus
.PROPERTIES_IFACE
)
4105 raise Exception("Invalid Set(P2PDeviceConfig) accepted")
4106 except dbus
.exceptions
.DBusException
, e
:
4107 if "Error.Failed: P2P is not available for this interface" not in str(e
):
4108 raise Exception("Unexpected error message for invalid Invite: " + str(e
))
4110 dev
[0].request("P2P_SET disabled 0")
4112 tests
= [ { 'DeviceName': 123 },
4113 { 'SsidPostfix': 123 },
4115 for changes
in tests
:
4117 if_obj
.Set(WPAS_DBUS_IFACE_P2PDEVICE
, "P2PDeviceConfig",
4118 dbus
.Dictionary(changes
, signature
='sv'),
4119 dbus_interface
=dbus
.PROPERTIES_IFACE
)
4120 raise Exception("Invalid Set(P2PDeviceConfig) accepted")
4121 except dbus
.exceptions
.DBusException
, e
:
4122 if "InvalidArgs" not in str(e
):
4123 raise Exception("Unexpected error message for invalid Invite: " + str(e
))
4125 def test_dbus_p2p_persistent(dev
, apdev
):
4126 """D-Bus P2P persistent group"""
4127 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
4128 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
4130 class TestDbusP2p(TestDbus
):
4131 def __init__(self
, bus
):
4132 TestDbus
.__init
__(self
, bus
)
4134 def __enter__(self
):
4135 gobject
.timeout_add(1, self
.run_test
)
4136 gobject
.timeout_add(15000, self
.timeout
)
4137 self
.add_signal(self
.groupStarted
, WPAS_DBUS_IFACE_P2PDEVICE
,
4139 self
.add_signal(self
.groupFinished
, WPAS_DBUS_IFACE_P2PDEVICE
,
4141 self
.add_signal(self
.persistentGroupAdded
,
4142 WPAS_DBUS_IFACE_P2PDEVICE
,
4143 "PersistentGroupAdded")
4147 def groupStarted(self
, properties
):
4148 logger
.debug("groupStarted: " + str(properties
))
4149 g_if_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
4150 properties
['interface_object'])
4151 group_p2p
= dbus
.Interface(g_if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
4152 group_p2p
.Disconnect()
4154 def groupFinished(self
, properties
):
4155 logger
.debug("groupFinished: " + str(properties
))
4158 def persistentGroupAdded(self
, path
, properties
):
4159 logger
.debug("persistentGroupAdded: %s %s" % (path
, str(properties
)))
4160 self
.persistent
= path
4162 def run_test(self
, *args
):
4163 logger
.debug("run_test")
4164 params
= dbus
.Dictionary({'persistent': True,
4166 logger
.info("Add a persistent group")
4167 p2p
.GroupAdd(params
)
4173 with
TestDbusP2p(bus
) as t
:
4175 raise Exception("Expected signals not seen")
4176 persistent
= t
.persistent
4178 p_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, persistent
)
4179 res
= p_obj
.Get(WPAS_DBUS_PERSISTENT_GROUP
, "Properties",
4180 dbus_interface
=dbus
.PROPERTIES_IFACE
, byte_arrays
=True)
4181 logger
.info("Persistent group Properties: " + str(res
))
4182 vals
= dbus
.Dictionary({ 'ssid': 'DIRECT-foo' }, signature
='sv')
4183 p_obj
.Set(WPAS_DBUS_PERSISTENT_GROUP
, "Properties", vals
,
4184 dbus_interface
=dbus
.PROPERTIES_IFACE
)
4185 res2
= p_obj
.Get(WPAS_DBUS_PERSISTENT_GROUP
, "Properties",
4186 dbus_interface
=dbus
.PROPERTIES_IFACE
)
4187 if len(res
) != len(res2
):
4188 raise Exception("Different number of parameters")
4190 if k
!= 'ssid' and res
[k
] != res2
[k
]:
4191 raise Exception("Parameter %s value changes" % k
)
4192 if res2
['ssid'] != '"DIRECT-foo"':
4193 raise Exception("Unexpected ssid")
4195 args
= dbus
.Dictionary({ 'ssid': 'DIRECT-testing',
4196 'psk': '1234567890' }, signature
='sv')
4197 group
= p2p
.AddPersistentGroup(args
)
4199 groups
= if_obj
.Get(WPAS_DBUS_IFACE_P2PDEVICE
, "PersistentGroups",
4200 dbus_interface
=dbus
.PROPERTIES_IFACE
)
4201 if len(groups
) != 2:
4202 raise Exception("Unexpected number of persistent groups: " + str(groups
))
4204 p2p
.RemoveAllPersistentGroups()
4206 groups
= if_obj
.Get(WPAS_DBUS_IFACE_P2PDEVICE
, "PersistentGroups",
4207 dbus_interface
=dbus
.PROPERTIES_IFACE
)
4208 if len(groups
) != 0:
4209 raise Exception("Unexpected number of persistent groups: " + str(groups
))
4212 p2p
.RemovePersistentGroup(persistent
)
4213 raise Exception("Invalid RemovePersistentGroup accepted")
4214 except dbus
.exceptions
.DBusException
, e
:
4215 if "NetworkUnknown: There is no such persistent group" not in str(e
):
4216 raise Exception("Unexpected error message for invalid RemovePersistentGroup: " + str(e
))
4218 def test_dbus_p2p_reinvoke_persistent(dev
, apdev
):
4219 """D-Bus P2P reinvoke persistent group"""
4220 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
4221 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
4223 addr0
= dev
[0].p2p_dev_addr()
4225 class TestDbusP2p(TestDbus
):
4226 def __init__(self
, bus
):
4227 TestDbus
.__init
__(self
, bus
)
4229 self
.waiting_end
= False
4231 self
.invited
= False
4233 def __enter__(self
):
4234 gobject
.timeout_add(1, self
.run_test
)
4235 gobject
.timeout_add(15000, self
.timeout
)
4236 self
.add_signal(self
.deviceFound
, WPAS_DBUS_IFACE_P2PDEVICE
,
4238 self
.add_signal(self
.groupStarted
, WPAS_DBUS_IFACE_P2PDEVICE
,
4240 self
.add_signal(self
.groupFinished
, WPAS_DBUS_IFACE_P2PDEVICE
,
4242 self
.add_signal(self
.persistentGroupAdded
,
4243 WPAS_DBUS_IFACE_P2PDEVICE
,
4244 "PersistentGroupAdded")
4245 self
.add_signal(self
.provisionDiscoveryRequestDisplayPin
,
4246 WPAS_DBUS_IFACE_P2PDEVICE
,
4247 "ProvisionDiscoveryRequestDisplayPin")
4248 self
.add_signal(self
.staAuthorized
, WPAS_DBUS_IFACE
,
4253 def groupStarted(self
, properties
):
4254 logger
.debug("groupStarted: " + str(properties
))
4255 self
.g_if_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
4256 properties
['interface_object'])
4257 if not self
.invited
:
4258 g_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
4259 properties
['group_object'])
4260 res
= g_obj
.GetAll(WPAS_DBUS_GROUP
,
4261 dbus_interface
=dbus
.PROPERTIES_IFACE
,
4263 bssid
= ':'.join([binascii
.hexlify(l
) for l
in res
['BSSID']])
4264 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4265 dev1
.scan_for_bss(bssid
, freq
=2412)
4266 dev1
.global_request("P2P_CONNECT " + addr0
+ " 12345670 join")
4268 def groupFinished(self
, properties
):
4269 logger
.debug("groupFinished: " + str(properties
))
4274 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4275 dev1
.global_request("SET persistent_reconnect 1")
4278 args
= { 'persistent_group_object': dbus
.ObjectPath(path
),
4279 'peer': self
.peer_path
}
4281 pin
= p2p
.Invite(args
)
4282 raise Exception("Invalid Invite accepted")
4283 except dbus
.exceptions
.DBusException
, e
:
4284 if "InvalidArgs" not in str(e
):
4285 raise Exception("Unexpected error message for invalid Invite: " + str(e
))
4287 args
= { 'persistent_group_object': self
.persistent
,
4288 'peer': self
.peer_path
}
4289 pin
= p2p
.Invite(args
)
4292 self
.sta_group_ev
= dev1
.wait_global_event(["P2P-GROUP-STARTED"],
4294 if self
.sta_group_ev
is None:
4295 raise Exception("P2P-GROUP-STARTED event not seen")
4297 def persistentGroupAdded(self
, path
, properties
):
4298 logger
.debug("persistentGroupAdded: %s %s" % (path
, str(properties
)))
4299 self
.persistent
= path
4301 def deviceFound(self
, path
):
4302 logger
.debug("deviceFound: path=%s" % path
)
4303 peer_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, path
)
4304 self
.peer
= peer_obj
.GetAll(WPAS_DBUS_P2P_PEER
,
4305 dbus_interface
=dbus
.PROPERTIES_IFACE
,
4308 def provisionDiscoveryRequestDisplayPin(self
, peer_object
, pin
):
4309 logger
.debug("provisionDiscoveryRequestDisplayPin - peer=%s pin=%s" % (peer_object
, pin
))
4310 self
.peer_path
= peer_object
4311 peer
= binascii
.unhexlify(peer_object
.split('/')[-1])
4316 addr
+= '%02x' % ord(p
)
4317 params
= { 'Role': 'registrar',
4318 'P2PDeviceAddress': self
.peer
['DeviceAddress'],
4319 'Bssid': self
.peer
['DeviceAddress'],
4322 logger
.info("Authorize peer to connect to the group")
4323 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4324 wps
= dbus
.Interface(self
.g_if_obj
, WPAS_DBUS_IFACE_WPS
)
4326 self
.sta_group_ev
= dev1
.wait_global_event(["P2P-GROUP-STARTED"],
4328 if self
.sta_group_ev
is None:
4329 raise Exception("P2P-GROUP-STARTED event not seen")
4331 def staAuthorized(self
, name
):
4332 logger
.debug("staAuthorized: " + name
)
4333 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4334 dev1
.group_form_result(self
.sta_group_ev
)
4336 ev
= dev1
.wait_global_event(["P2P-GROUP-REMOVED"], timeout
=10)
4338 raise Exception("Group removal timed out")
4339 group_p2p
= dbus
.Interface(self
.g_if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
4340 group_p2p
.Disconnect()
4342 def run_test(self
, *args
):
4343 logger
.debug("run_test")
4344 params
= dbus
.Dictionary({'persistent': True,
4346 logger
.info("Add a persistent group")
4347 p2p
.GroupAdd(params
)
4353 with
TestDbusP2p(bus
) as t
:
4355 raise Exception("Expected signals not seen")
4357 def test_dbus_p2p_go_neg_rx(dev
, apdev
):
4358 """D-Bus P2P GO Negotiation receive"""
4359 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
4360 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
4361 addr0
= dev
[0].p2p_dev_addr()
4363 class TestDbusP2p(TestDbus
):
4364 def __init__(self
, bus
):
4365 TestDbus
.__init
__(self
, bus
)
4368 def __enter__(self
):
4369 gobject
.timeout_add(1, self
.run_test
)
4370 gobject
.timeout_add(15000, self
.timeout
)
4371 self
.add_signal(self
.deviceFound
, WPAS_DBUS_IFACE_P2PDEVICE
,
4373 self
.add_signal(self
.goNegotiationRequest
,
4374 WPAS_DBUS_IFACE_P2PDEVICE
,
4375 "GONegotiationRequest",
4377 self
.add_signal(self
.goNegotiationSuccess
,
4378 WPAS_DBUS_IFACE_P2PDEVICE
,
4379 "GONegotiationSuccess",
4381 self
.add_signal(self
.groupStarted
, WPAS_DBUS_IFACE_P2PDEVICE
,
4383 self
.add_signal(self
.groupFinished
, WPAS_DBUS_IFACE_P2PDEVICE
,
4388 def deviceFound(self
, path
):
4389 logger
.debug("deviceFound: path=%s" % path
)
4391 def goNegotiationRequest(self
, path
, dev_passwd_id
, go_intent
=0):
4392 logger
.debug("goNegotiationRequest: path=%s dev_passwd_id=%d go_intent=%d" % (path
, dev_passwd_id
, go_intent
))
4393 if dev_passwd_id
!= 1:
4394 raise Exception("Unexpected dev_passwd_id=%d" % dev_passwd_id
)
4395 args
= { 'peer': path
, 'wps_method': 'display', 'pin': '12345670',
4396 'go_intent': 15, 'persistent': False, 'frequency': 5175 }
4399 raise Exception("Invalid Connect accepted")
4400 except dbus
.exceptions
.DBusException
, e
:
4401 if "ConnectChannelUnsupported" not in str(e
):
4402 raise Exception("Unexpected error message for invalid Connect: " + str(e
))
4404 args
= { 'peer': path
, 'wps_method': 'display', 'pin': '12345670',
4405 'go_intent': 15, 'persistent': False }
4408 def goNegotiationSuccess(self
, properties
):
4409 logger
.debug("goNegotiationSuccess: properties=%s" % str(properties
))
4411 def groupStarted(self
, properties
):
4412 logger
.debug("groupStarted: " + str(properties
))
4413 g_if_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
4414 properties
['interface_object'])
4415 group_p2p
= dbus
.Interface(g_if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
4416 group_p2p
.Disconnect()
4418 def groupFinished(self
, properties
):
4419 logger
.debug("groupFinished: " + str(properties
))
4423 def run_test(self
, *args
):
4424 logger
.debug("run_test")
4426 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4427 if not dev1
.discover_peer(addr0
):
4428 raise Exception("Peer not found")
4429 dev1
.global_request("P2P_CONNECT " + addr0
+ " 12345670 enter")
4435 with
TestDbusP2p(bus
) as t
:
4437 raise Exception("Expected signals not seen")
4439 def test_dbus_p2p_go_neg_auth(dev
, apdev
):
4440 """D-Bus P2P GO Negotiation authorized"""
4441 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
4442 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
4443 addr0
= dev
[0].p2p_dev_addr()
4446 class TestDbusP2p(TestDbus
):
4447 def __init__(self
, bus
):
4448 TestDbus
.__init
__(self
, bus
)
4450 self
.peer_joined
= False
4451 self
.peer_disconnected
= False
4453 def __enter__(self
):
4454 gobject
.timeout_add(1, self
.run_test
)
4455 gobject
.timeout_add(15000, self
.timeout
)
4456 self
.add_signal(self
.deviceFound
, WPAS_DBUS_IFACE_P2PDEVICE
,
4458 self
.add_signal(self
.goNegotiationSuccess
,
4459 WPAS_DBUS_IFACE_P2PDEVICE
,
4460 "GONegotiationSuccess",
4462 self
.add_signal(self
.groupStarted
, WPAS_DBUS_IFACE_P2PDEVICE
,
4464 self
.add_signal(self
.groupFinished
, WPAS_DBUS_IFACE_P2PDEVICE
,
4466 self
.add_signal(self
.staDeauthorized
, WPAS_DBUS_IFACE
,
4468 self
.add_signal(self
.peerJoined
, WPAS_DBUS_GROUP
,
4470 self
.add_signal(self
.peerDisconnected
, WPAS_DBUS_GROUP
,
4475 def deviceFound(self
, path
):
4476 logger
.debug("deviceFound: path=%s" % path
)
4477 args
= { 'peer': path
, 'wps_method': 'keypad',
4478 'go_intent': 15, 'authorize_only': True }
4481 raise Exception("Invalid Connect accepted")
4482 except dbus
.exceptions
.DBusException
, e
:
4483 if "InvalidArgs" not in str(e
):
4484 raise Exception("Unexpected error message for invalid Connect: " + str(e
))
4486 args
= { 'peer': path
, 'wps_method': 'keypad', 'pin': '12345670',
4487 'go_intent': 15, 'authorize_only': True }
4490 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4491 if not dev1
.discover_peer(addr0
):
4492 raise Exception("Peer not found")
4493 dev1
.global_request("P2P_CONNECT " + addr0
+ " 12345670 display go_intent=0")
4494 ev
= dev1
.wait_global_event(["P2P-GROUP-STARTED"], timeout
=15)
4496 raise Exception("Group formation timed out")
4497 self
.sta_group_ev
= ev
4499 def goNegotiationSuccess(self
, properties
):
4500 logger
.debug("goNegotiationSuccess: properties=%s" % str(properties
))
4502 def groupStarted(self
, properties
):
4503 logger
.debug("groupStarted: " + str(properties
))
4504 self
.g_if_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
4505 properties
['interface_object'])
4506 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4507 dev1
.group_form_result(self
.sta_group_ev
)
4510 def staDeauthorized(self
, name
):
4511 logger
.debug("staDeuthorized: " + name
)
4512 group_p2p
= dbus
.Interface(self
.g_if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
4513 group_p2p
.Disconnect()
4515 def peerJoined(self
, peer
):
4516 logger
.debug("peerJoined: " + peer
)
4517 self
.peer_joined
= True
4519 def peerDisconnected(self
, peer
):
4520 logger
.debug("peerDisconnected: " + peer
)
4521 self
.peer_disconnected
= True
4523 def groupFinished(self
, properties
):
4524 logger
.debug("groupFinished: " + str(properties
))
4528 def run_test(self
, *args
):
4529 logger
.debug("run_test")
4530 p2p
.Find(dbus
.Dictionary({'DiscoveryType': 'social'}))
4534 return self
.done
and self
.peer_joined
and self
.peer_disconnected
4536 with
TestDbusP2p(bus
) as t
:
4538 raise Exception("Expected signals not seen")
4540 def test_dbus_p2p_go_neg_init(dev
, apdev
):
4541 """D-Bus P2P GO Negotiation initiation"""
4542 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
4543 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
4544 addr0
= dev
[0].p2p_dev_addr()
4547 class TestDbusP2p(TestDbus
):
4548 def __init__(self
, bus
):
4549 TestDbus
.__init
__(self
, bus
)
4551 self
.peer_group_added
= False
4552 self
.peer_group_removed
= False
4554 def __enter__(self
):
4555 gobject
.timeout_add(1, self
.run_test
)
4556 gobject
.timeout_add(15000, self
.timeout
)
4557 self
.add_signal(self
.deviceFound
, WPAS_DBUS_IFACE_P2PDEVICE
,
4559 self
.add_signal(self
.goNegotiationSuccess
,
4560 WPAS_DBUS_IFACE_P2PDEVICE
,
4561 "GONegotiationSuccess",
4563 self
.add_signal(self
.groupStarted
, WPAS_DBUS_IFACE_P2PDEVICE
,
4565 self
.add_signal(self
.groupFinished
, WPAS_DBUS_IFACE_P2PDEVICE
,
4567 self
.add_signal(self
.propertiesChanged
, dbus
.PROPERTIES_IFACE
,
4568 "PropertiesChanged")
4572 def deviceFound(self
, path
):
4573 logger
.debug("deviceFound: path=%s" % path
)
4574 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4575 args
= { 'peer': path
, 'wps_method': 'keypad', 'pin': '12345670',
4579 ev
= dev1
.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout
=15)
4581 raise Exception("Timeout while waiting for GO Neg Request")
4582 dev1
.global_request("P2P_CONNECT " + addr0
+ " 12345670 display go_intent=15")
4583 ev
= dev1
.wait_global_event(["P2P-GROUP-STARTED"], timeout
=15)
4585 raise Exception("Group formation timed out")
4586 self
.sta_group_ev
= ev
4588 def goNegotiationSuccess(self
, properties
):
4589 logger
.debug("goNegotiationSuccess: properties=%s" % str(properties
))
4591 def groupStarted(self
, properties
):
4592 logger
.debug("groupStarted: " + str(properties
))
4593 g_if_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
4594 properties
['interface_object'])
4595 group_p2p
= dbus
.Interface(g_if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
4596 group_p2p
.Disconnect()
4597 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4598 dev1
.group_form_result(self
.sta_group_ev
)
4601 def groupFinished(self
, properties
):
4602 logger
.debug("groupFinished: " + str(properties
))
4605 def propertiesChanged(self
, interface_name
, changed_properties
,
4606 invalidated_properties
):
4607 logger
.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name
, str(changed_properties
), str(invalidated_properties
)))
4608 if interface_name
!= WPAS_DBUS_P2P_PEER
:
4610 if "Groups" not in changed_properties
:
4612 if len(changed_properties
["Groups"]) > 0:
4613 self
.peer_group_added
= True
4614 if len(changed_properties
["Groups"]) == 0:
4615 if not self
.peer_group_added
:
4616 # This is likely a leftover event from an earlier test case,
4617 # ignore it to allow this test case to go through its steps.
4618 logger
.info("Ignore propertiesChanged indicating group removal before group has been added")
4620 self
.peer_group_removed
= True
4623 def run_test(self
, *args
):
4624 logger
.debug("run_test")
4625 p2p
.Find(dbus
.Dictionary({'DiscoveryType': 'social'}))
4629 return self
.done
and self
.peer_group_added
and self
.peer_group_removed
4631 with
TestDbusP2p(bus
) as t
:
4633 raise Exception("Expected signals not seen")
4635 def test_dbus_p2p_group_termination_by_go(dev
, apdev
):
4636 """D-Bus P2P group removal on GO terminating the group"""
4637 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
4638 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
4639 addr0
= dev
[0].p2p_dev_addr()
4642 class TestDbusP2p(TestDbus
):
4643 def __init__(self
, bus
):
4644 TestDbus
.__init
__(self
, bus
)
4646 self
.peer_group_added
= False
4647 self
.peer_group_removed
= False
4649 def __enter__(self
):
4650 gobject
.timeout_add(1, self
.run_test
)
4651 gobject
.timeout_add(15000, self
.timeout
)
4652 self
.add_signal(self
.deviceFound
, WPAS_DBUS_IFACE_P2PDEVICE
,
4654 self
.add_signal(self
.goNegotiationSuccess
,
4655 WPAS_DBUS_IFACE_P2PDEVICE
,
4656 "GONegotiationSuccess",
4658 self
.add_signal(self
.groupStarted
, WPAS_DBUS_IFACE_P2PDEVICE
,
4660 self
.add_signal(self
.groupFinished
, WPAS_DBUS_IFACE_P2PDEVICE
,
4662 self
.add_signal(self
.propertiesChanged
, dbus
.PROPERTIES_IFACE
,
4663 "PropertiesChanged")
4667 def deviceFound(self
, path
):
4668 logger
.debug("deviceFound: path=%s" % path
)
4669 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4670 args
= { 'peer': path
, 'wps_method': 'keypad', 'pin': '12345670',
4674 ev
= dev1
.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout
=15)
4676 raise Exception("Timeout while waiting for GO Neg Request")
4677 dev1
.global_request("P2P_CONNECT " + addr0
+ " 12345670 display go_intent=15")
4678 ev
= dev1
.wait_global_event(["P2P-GROUP-STARTED"], timeout
=15)
4680 raise Exception("Group formation timed out")
4681 self
.sta_group_ev
= ev
4683 def goNegotiationSuccess(self
, properties
):
4684 logger
.debug("goNegotiationSuccess: properties=%s" % str(properties
))
4686 def groupStarted(self
, properties
):
4687 logger
.debug("groupStarted: " + str(properties
))
4688 g_if_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
4689 properties
['interface_object'])
4690 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4691 dev1
.group_form_result(self
.sta_group_ev
)
4694 def groupFinished(self
, properties
):
4695 logger
.debug("groupFinished: " + str(properties
))
4698 def propertiesChanged(self
, interface_name
, changed_properties
,
4699 invalidated_properties
):
4700 logger
.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name
, str(changed_properties
), str(invalidated_properties
)))
4701 if interface_name
!= WPAS_DBUS_P2P_PEER
:
4703 if "Groups" not in changed_properties
:
4705 if len(changed_properties
["Groups"]) > 0:
4706 self
.peer_group_added
= True
4707 if len(changed_properties
["Groups"]) == 0 and self
.peer_group_added
:
4708 self
.peer_group_removed
= True
4711 def run_test(self
, *args
):
4712 logger
.debug("run_test")
4713 p2p
.Find(dbus
.Dictionary({'DiscoveryType': 'social'}))
4717 return self
.done
and self
.peer_group_added
and self
.peer_group_removed
4719 with
TestDbusP2p(bus
) as t
:
4721 raise Exception("Expected signals not seen")
4723 def test_dbus_p2p_group_idle_timeout(dev
, apdev
):
4724 """D-Bus P2P group removal on idle timeout"""
4726 dev
[0].global_request("SET p2p_group_idle 1")
4727 _test_dbus_p2p_group_idle_timeout(dev
, apdev
)
4729 dev
[0].global_request("SET p2p_group_idle 0")
4731 def _test_dbus_p2p_group_idle_timeout(dev
, apdev
):
4732 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
4733 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
4734 addr0
= dev
[0].p2p_dev_addr()
4737 class TestDbusP2p(TestDbus
):
4738 def __init__(self
, bus
):
4739 TestDbus
.__init
__(self
, bus
)
4741 self
.group_started
= False
4742 self
.peer_group_added
= False
4743 self
.peer_group_removed
= False
4745 def __enter__(self
):
4746 gobject
.timeout_add(1, self
.run_test
)
4747 gobject
.timeout_add(15000, self
.timeout
)
4748 self
.add_signal(self
.deviceFound
, WPAS_DBUS_IFACE_P2PDEVICE
,
4750 self
.add_signal(self
.goNegotiationSuccess
,
4751 WPAS_DBUS_IFACE_P2PDEVICE
,
4752 "GONegotiationSuccess",
4754 self
.add_signal(self
.groupStarted
, WPAS_DBUS_IFACE_P2PDEVICE
,
4756 self
.add_signal(self
.groupFinished
, WPAS_DBUS_IFACE_P2PDEVICE
,
4758 self
.add_signal(self
.propertiesChanged
, dbus
.PROPERTIES_IFACE
,
4759 "PropertiesChanged")
4763 def deviceFound(self
, path
):
4764 logger
.debug("deviceFound: path=%s" % path
)
4765 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4766 args
= { 'peer': path
, 'wps_method': 'keypad', 'pin': '12345670',
4770 ev
= dev1
.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout
=15)
4772 raise Exception("Timeout while waiting for GO Neg Request")
4773 dev1
.global_request("P2P_CONNECT " + addr0
+ " 12345670 display go_intent=15")
4774 ev
= dev1
.wait_global_event(["P2P-GROUP-STARTED"], timeout
=15)
4776 raise Exception("Group formation timed out")
4777 self
.sta_group_ev
= ev
4779 def goNegotiationSuccess(self
, properties
):
4780 logger
.debug("goNegotiationSuccess: properties=%s" % str(properties
))
4782 def groupStarted(self
, properties
):
4783 logger
.debug("groupStarted: " + str(properties
))
4784 self
.group_started
= True
4785 g_if_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
4786 properties
['interface_object'])
4787 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4788 dev1
.group_form_result(self
.sta_group_ev
)
4789 ifaddr
= dev1
.group_request("STA-FIRST").splitlines()[0]
4790 # Force disassociation with different reason code so that the
4791 # P2P Client using D-Bus does not get normal group termination event
4793 dev1
.group_request("DEAUTHENTICATE " + ifaddr
+ " reason=0 test=0")
4796 def groupFinished(self
, properties
):
4797 logger
.debug("groupFinished: " + str(properties
))
4800 def propertiesChanged(self
, interface_name
, changed_properties
,
4801 invalidated_properties
):
4802 logger
.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name
, str(changed_properties
), str(invalidated_properties
)))
4803 if interface_name
!= WPAS_DBUS_P2P_PEER
:
4805 if not self
.group_started
:
4807 if "Groups" not in changed_properties
:
4809 if len(changed_properties
["Groups"]) > 0:
4810 self
.peer_group_added
= True
4811 if len(changed_properties
["Groups"]) == 0:
4812 self
.peer_group_removed
= True
4815 def run_test(self
, *args
):
4816 logger
.debug("run_test")
4817 p2p
.Find(dbus
.Dictionary({'DiscoveryType': 'social'}))
4821 return self
.done
and self
.peer_group_added
and self
.peer_group_removed
4823 with
TestDbusP2p(bus
) as t
:
4825 raise Exception("Expected signals not seen")
4827 def test_dbus_p2p_wps_failure(dev
, apdev
):
4828 """D-Bus P2P WPS failure"""
4829 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
4830 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
4831 addr0
= dev
[0].p2p_dev_addr()
4833 class TestDbusP2p(TestDbus
):
4834 def __init__(self
, bus
):
4835 TestDbus
.__init
__(self
, bus
)
4836 self
.wps_failed
= False
4837 self
.formation_failure
= False
4839 def __enter__(self
):
4840 gobject
.timeout_add(1, self
.run_test
)
4841 gobject
.timeout_add(15000, self
.timeout
)
4842 self
.add_signal(self
.goNegotiationRequest
,
4843 WPAS_DBUS_IFACE_P2PDEVICE
,
4844 "GONegotiationRequest",
4846 self
.add_signal(self
.goNegotiationSuccess
,
4847 WPAS_DBUS_IFACE_P2PDEVICE
,
4848 "GONegotiationSuccess",
4850 self
.add_signal(self
.groupStarted
, WPAS_DBUS_IFACE_P2PDEVICE
,
4852 self
.add_signal(self
.wpsFailed
, WPAS_DBUS_IFACE_P2PDEVICE
,
4854 self
.add_signal(self
.groupFormationFailure
,
4855 WPAS_DBUS_IFACE_P2PDEVICE
,
4856 "GroupFormationFailure")
4860 def goNegotiationRequest(self
, path
, dev_passwd_id
, go_intent
=0):
4861 logger
.debug("goNegotiationRequest: path=%s dev_passwd_id=%d go_intent=%d" % (path
, dev_passwd_id
, go_intent
))
4862 if dev_passwd_id
!= 1:
4863 raise Exception("Unexpected dev_passwd_id=%d" % dev_passwd_id
)
4864 args
= { 'peer': path
, 'wps_method': 'display', 'pin': '12345670',
4868 def goNegotiationSuccess(self
, properties
):
4869 logger
.debug("goNegotiationSuccess: properties=%s" % str(properties
))
4871 def groupStarted(self
, properties
):
4872 logger
.debug("groupStarted: " + str(properties
))
4873 raise Exception("Unexpected GroupStarted")
4875 def wpsFailed(self
, name
, args
):
4876 logger
.debug("wpsFailed - name=%s args=%s" % (name
, str(args
)))
4877 self
.wps_failed
= True
4878 if self
.formation_failure
:
4881 def groupFormationFailure(self
, reason
):
4882 logger
.debug("groupFormationFailure - reason=%s" % reason
)
4883 self
.formation_failure
= True
4887 def run_test(self
, *args
):
4888 logger
.debug("run_test")
4890 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4891 if not dev1
.discover_peer(addr0
):
4892 raise Exception("Peer not found")
4893 dev1
.global_request("P2P_CONNECT " + addr0
+ " 87654321 enter")
4897 return self
.wps_failed
and self
.formation_failure
4899 with
TestDbusP2p(bus
) as t
:
4901 raise Exception("Expected signals not seen")
4903 def test_dbus_p2p_two_groups(dev
, apdev
):
4904 """D-Bus P2P with two concurrent groups"""
4905 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
4906 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
4908 dev
[0].request("SET p2p_no_group_iface 0")
4909 addr0
= dev
[0].p2p_dev_addr()
4910 addr1
= dev
[1].p2p_dev_addr()
4911 addr2
= dev
[2].p2p_dev_addr()
4912 dev
[1].p2p_start_go(freq
=2412)
4913 dev1_group_ifname
= dev
[1].group_ifname
4915 class TestDbusP2p(TestDbus
):
4916 def __init__(self
, bus
):
4917 TestDbus
.__init
__(self
, bus
)
4923 self
.groups_removed
= False
4925 def __enter__(self
):
4926 gobject
.timeout_add(1, self
.run_test
)
4927 gobject
.timeout_add(15000, self
.timeout
)
4928 self
.add_signal(self
.propertiesChanged
, dbus
.PROPERTIES_IFACE
,
4929 "PropertiesChanged", byte_arrays
=True)
4930 self
.add_signal(self
.deviceFound
, WPAS_DBUS_IFACE_P2PDEVICE
,
4932 self
.add_signal(self
.groupStarted
, WPAS_DBUS_IFACE_P2PDEVICE
,
4934 self
.add_signal(self
.groupFinished
, WPAS_DBUS_IFACE_P2PDEVICE
,
4936 self
.add_signal(self
.peerJoined
, WPAS_DBUS_GROUP
,
4941 def propertiesChanged(self
, interface_name
, changed_properties
,
4942 invalidated_properties
):
4943 logger
.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name
, str(changed_properties
), str(invalidated_properties
)))
4945 def deviceFound(self
, path
):
4946 logger
.debug("deviceFound: path=%s" % path
)
4947 if addr2
.replace(':','') in path
:
4949 elif addr1
.replace(':','') in path
:
4951 if self
.go
and not self
.group1
:
4952 logger
.info("Join the group")
4955 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4956 dev1
.group_ifname
= dev1_group_ifname
4957 dev1
.group_request("WPS_PIN any " + pin
)
4958 args
= { 'peer': self
.go
,
4960 'wps_method': 'pin',
4965 def groupStarted(self
, properties
):
4966 logger
.debug("groupStarted: " + str(properties
))
4967 prop
= if_obj
.GetAll(WPAS_DBUS_IFACE_P2PDEVICE
,
4968 dbus_interface
=dbus
.PROPERTIES_IFACE
)
4969 logger
.debug("p2pdevice properties: " + str(prop
))
4971 g_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
4972 properties
['group_object'])
4973 res
= g_obj
.GetAll(WPAS_DBUS_GROUP
,
4974 dbus_interface
=dbus
.PROPERTIES_IFACE
,
4976 logger
.debug("Group properties: " + str(res
))
4979 self
.group1
= properties
['group_object']
4980 self
.group1iface
= properties
['interface_object']
4981 self
.g1_if_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
4984 logger
.info("Start autonomous GO")
4985 params
= dbus
.Dictionary({ 'frequency': 2412 })
4986 p2p
.GroupAdd(params
)
4987 elif not self
.group2
:
4988 self
.group2
= properties
['group_object']
4989 self
.group2iface
= properties
['interface_object']
4990 self
.g2_if_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
4992 self
.g2_bssid
= res
['BSSID']
4994 if self
.group1
and self
.group2
:
4995 logger
.info("Authorize peer to join the group")
4996 a2
= binascii
.unhexlify(addr2
.replace(':',''))
4997 params
= { 'Role': 'enrollee',
4998 'P2PDeviceAddress': dbus
.ByteArray(a2
),
4999 'Bssid': dbus
.ByteArray(a2
),
5002 g_wps
= dbus
.Interface(self
.g2_if_obj
, WPAS_DBUS_IFACE_WPS
)
5005 bssid
= ':'.join([binascii
.hexlify(l
) for l
in self
.g2_bssid
])
5006 dev2
= WpaSupplicant('wlan2', '/tmp/wpas-wlan2')
5007 dev2
.scan_for_bss(bssid
, freq
=2412)
5008 dev2
.global_request("P2P_CONNECT " + bssid
+ " 12345670 join freq=2412")
5009 ev
= dev2
.wait_global_event(["P2P-GROUP-STARTED"], timeout
=15)
5011 raise Exception("Group join timed out")
5012 self
.dev2_group_ev
= ev
5014 def groupFinished(self
, properties
):
5015 logger
.debug("groupFinished: " + str(properties
))
5017 if self
.group1
== properties
['group_object']:
5019 elif self
.group2
== properties
['group_object']:
5022 if not self
.group1
and not self
.group2
:
5026 def peerJoined(self
, peer
):
5027 logger
.debug("peerJoined: " + peer
)
5028 if self
.groups_removed
:
5030 self
.check_results()
5032 dev2
= WpaSupplicant('wlan2', '/tmp/wpas-wlan2')
5033 dev2
.group_form_result(self
.dev2_group_ev
)
5036 logger
.info("Disconnect group2")
5037 group_p2p
= dbus
.Interface(self
.g2_if_obj
,
5038 WPAS_DBUS_IFACE_P2PDEVICE
)
5039 group_p2p
.Disconnect()
5041 logger
.info("Disconnect group1")
5042 group_p2p
= dbus
.Interface(self
.g1_if_obj
,
5043 WPAS_DBUS_IFACE_P2PDEVICE
)
5044 group_p2p
.Disconnect()
5045 self
.groups_removed
= True
5047 def check_results(self
):
5048 logger
.info("Check results with two concurrent groups in operation")
5050 g1_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, self
.group1
)
5051 res1
= g1_obj
.GetAll(WPAS_DBUS_GROUP
,
5052 dbus_interface
=dbus
.PROPERTIES_IFACE
,
5055 g2_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, self
.group2
)
5056 res2
= g2_obj
.GetAll(WPAS_DBUS_GROUP
,
5057 dbus_interface
=dbus
.PROPERTIES_IFACE
,
5060 logger
.info("group1 = " + self
.group1
)
5061 logger
.debug("Group properties: " + str(res1
))
5063 logger
.info("group2 = " + self
.group2
)
5064 logger
.debug("Group properties: " + str(res2
))
5066 prop
= if_obj
.GetAll(WPAS_DBUS_IFACE_P2PDEVICE
,
5067 dbus_interface
=dbus
.PROPERTIES_IFACE
)
5068 logger
.debug("p2pdevice properties: " + str(prop
))
5070 if res1
['Role'] != 'client':
5071 raise Exception("Group1 role reported incorrectly: " + res1
['Role'])
5072 if res2
['Role'] != 'GO':
5073 raise Exception("Group2 role reported incorrectly: " + res2
['Role'])
5074 if prop
['Role'] != 'device':
5075 raise Exception("p2pdevice role reported incorrectly: " + prop
['Role'])
5077 if len(res2
['Members']) != 1:
5078 raise Exception("Unexpected Members value for group 2")
5080 def run_test(self
, *args
):
5081 logger
.debug("run_test")
5082 p2p
.Find(dbus
.Dictionary({'DiscoveryType': 'social'}))
5088 with
TestDbusP2p(bus
) as t
:
5090 raise Exception("Expected signals not seen")
5092 dev
[1].remove_group()
5094 def test_dbus_p2p_cancel(dev
, apdev
):
5095 """D-Bus P2P Cancel"""
5096 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
5097 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
5100 raise Exception("Unexpected p2p.Cancel() success")
5101 except dbus
.exceptions
.DBusException
, e
:
5104 addr0
= dev
[0].p2p_dev_addr()
5107 class TestDbusP2p(TestDbus
):
5108 def __init__(self
, bus
):
5109 TestDbus
.__init
__(self
, bus
)
5112 def __enter__(self
):
5113 gobject
.timeout_add(1, self
.run_test
)
5114 gobject
.timeout_add(15000, self
.timeout
)
5115 self
.add_signal(self
.deviceFound
, WPAS_DBUS_IFACE_P2PDEVICE
,
5120 def deviceFound(self
, path
):
5121 logger
.debug("deviceFound: path=%s" % path
)
5122 args
= { 'peer': path
, 'wps_method': 'keypad', 'pin': '12345670',
5129 def run_test(self
, *args
):
5130 logger
.debug("run_test")
5131 p2p
.Find(dbus
.Dictionary({'DiscoveryType': 'social'}))
5137 with
TestDbusP2p(bus
) as t
:
5139 raise Exception("Expected signals not seen")
5141 def test_dbus_p2p_ip_addr(dev
, apdev
):
5142 """D-Bus P2P and IP address parameters"""
5143 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
5144 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
5146 vals
= [ ("IpAddrGo", "192.168.43.1"),
5147 ("IpAddrMask", "255.255.255.0"),
5148 ("IpAddrStart", "192.168.43.100"),
5149 ("IpAddrEnd", "192.168.43.199") ]
5150 for field
, value
in vals
:
5151 if_obj
.Set(WPAS_DBUS_IFACE
, field
, value
,
5152 dbus_interface
=dbus
.PROPERTIES_IFACE
)
5153 val
= if_obj
.Get(WPAS_DBUS_IFACE
, field
,
5154 dbus_interface
=dbus
.PROPERTIES_IFACE
)
5156 raise Exception("Unexpected %s value: %s" % (field
, val
))
5158 set_ip_addr_info(dev
[1])
5160 dev
[0].global_request("SET p2p_go_intent 0")
5162 req
= dev
[0].global_request("NFC_GET_HANDOVER_REQ NDEF P2P-CR").rstrip()
5164 raise Exception("Failed to generate NFC connection handover request")
5165 sel
= dev
[1].global_request("NFC_GET_HANDOVER_SEL NDEF P2P-CR").rstrip()
5167 raise Exception("Failed to generate NFC connection handover select")
5168 dev
[0].dump_monitor()
5169 dev
[1].dump_monitor()
5170 res
= dev
[1].global_request("NFC_REPORT_HANDOVER RESP P2P " + req
+ " " + sel
)
5172 raise Exception("Failed to report NFC connection handover to wpa_supplicant(resp)")
5173 res
= dev
[0].global_request("NFC_REPORT_HANDOVER INIT P2P " + req
+ " " + sel
)
5175 raise Exception("Failed to report NFC connection handover to wpa_supplicant(init)")
5177 class TestDbusP2p(TestDbus
):
5178 def __init__(self
, bus
):
5179 TestDbus
.__init
__(self
, bus
)
5182 def __enter__(self
):
5183 gobject
.timeout_add(1, self
.run_test
)
5184 gobject
.timeout_add(15000, self
.timeout
)
5185 self
.add_signal(self
.groupStarted
, WPAS_DBUS_IFACE_P2PDEVICE
,
5190 def groupStarted(self
, properties
):
5191 logger
.debug("groupStarted: " + str(properties
))
5194 if 'IpAddrGo' not in properties
:
5195 logger
.info("IpAddrGo missing from GroupStarted")
5196 ip_addr_go
= properties
['IpAddrGo']
5197 addr
= "%d.%d.%d.%d" % (ip_addr_go
[0], ip_addr_go
[1], ip_addr_go
[2], ip_addr_go
[3])
5198 if addr
!= "192.168.42.1":
5199 logger
.info("Unexpected IpAddrGo value: " + addr
)
5202 def run_test(self
, *args
):
5203 logger
.debug("run_test")
5209 with
TestDbusP2p(bus
) as t
:
5211 raise Exception("Expected signals not seen")
5213 def test_dbus_introspect(dev
, apdev
):
5214 """D-Bus introspection"""
5215 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
5217 res
= if_obj
.Introspect(WPAS_DBUS_IFACE
,
5218 dbus_interface
=dbus
.INTROSPECTABLE_IFACE
)
5219 logger
.info("Initial Introspect: " + str(res
))
5220 if res
is None or "Introspectable" not in res
or "GroupStarted" not in res
:
5221 raise Exception("Unexpected initial Introspect response: " + str(res
))
5222 if "FastReauth" not in res
or "PassiveScan" not in res
:
5223 raise Exception("Unexpected initial Introspect response: " + str(res
))
5225 with
alloc_fail(dev
[0], 1, "wpa_dbus_introspect"):
5226 res2
= if_obj
.Introspect(WPAS_DBUS_IFACE
,
5227 dbus_interface
=dbus
.INTROSPECTABLE_IFACE
)
5228 logger
.info("Introspect: " + str(res2
))
5229 if res2
is not None:
5230 raise Exception("Unexpected Introspect response")
5232 with
alloc_fail(dev
[0], 1, "=add_interface;wpa_dbus_introspect"):
5233 res2
= if_obj
.Introspect(WPAS_DBUS_IFACE
,
5234 dbus_interface
=dbus
.INTROSPECTABLE_IFACE
)
5235 logger
.info("Introspect: " + str(res2
))
5237 raise Exception("No Introspect response")
5238 if len(res2
) >= len(res
):
5239 raise Exception("Unexpected Introspect response")
5241 with
alloc_fail(dev
[0], 1, "wpabuf_alloc;add_interface;wpa_dbus_introspect"):
5242 res2
= if_obj
.Introspect(WPAS_DBUS_IFACE
,
5243 dbus_interface
=dbus
.INTROSPECTABLE_IFACE
)
5244 logger
.info("Introspect: " + str(res2
))
5246 raise Exception("No Introspect response")
5247 if len(res2
) >= len(res
):
5248 raise Exception("Unexpected Introspect response")
5250 with
alloc_fail(dev
[0], 2, "=add_interface;wpa_dbus_introspect"):
5251 res2
= if_obj
.Introspect(WPAS_DBUS_IFACE
,
5252 dbus_interface
=dbus
.INTROSPECTABLE_IFACE
)
5253 logger
.info("Introspect: " + str(res2
))
5255 raise Exception("No Introspect response")
5256 if len(res2
) >= len(res
):
5257 raise Exception("Unexpected Introspect response")
5259 def test_dbus_ap(dev
, apdev
):
5260 """D-Bus AddNetwork for AP mode"""
5261 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
5262 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
5264 ssid
= "test-wpa2-psk"
5265 passphrase
= 'qwertyuiop'
5267 class TestDbusConnect(TestDbus
):
5268 def __init__(self
, bus
):
5269 TestDbus
.__init
__(self
, bus
)
5270 self
.started
= False
5272 def __enter__(self
):
5273 gobject
.timeout_add(1, self
.run_connect
)
5274 gobject
.timeout_add(15000, self
.timeout
)
5275 self
.add_signal(self
.networkAdded
, WPAS_DBUS_IFACE
, "NetworkAdded")
5276 self
.add_signal(self
.networkSelected
, WPAS_DBUS_IFACE
,
5278 self
.add_signal(self
.propertiesChanged
, WPAS_DBUS_IFACE
,
5279 "PropertiesChanged")
5283 def networkAdded(self
, network
, properties
):
5284 logger
.debug("networkAdded: %s" % str(network
))
5285 logger
.debug(str(properties
))
5287 def networkSelected(self
, network
):
5288 logger
.debug("networkSelected: %s" % str(network
))
5289 self
.network_selected
= True
5291 def propertiesChanged(self
, properties
):
5292 logger
.debug("propertiesChanged: %s" % str(properties
))
5293 if 'State' in properties
and properties
['State'] == "completed":
5297 def run_connect(self
, *args
):
5298 logger
.debug("run_connect")
5299 args
= dbus
.Dictionary({ 'ssid': ssid
,
5300 'key_mgmt': 'WPA-PSK',
5303 'frequency': 2412 },
5305 self
.netw
= iface
.AddNetwork(args
)
5306 iface
.SelectNetwork(self
.netw
)
5312 with
TestDbusConnect(bus
) as t
:
5314 raise Exception("Expected signals not seen")
5315 dev
[1].connect(ssid
, psk
=passphrase
, scan_freq
="2412")
5317 def test_dbus_connect_wpa_eap(dev
, apdev
):
5318 """D-Bus AddNetwork and connection with WPA+WPA2-Enterprise AP"""
5319 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
5320 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
5322 ssid
= "test-wpa-eap"
5323 params
= hostapd
.wpa_eap_params(ssid
=ssid
)
5325 params
["rsn_pairwise"] = "CCMP"
5326 hapd
= hostapd
.add_ap(apdev
[0], params
)
5328 class TestDbusConnect(TestDbus
):
5329 def __init__(self
, bus
):
5330 TestDbus
.__init
__(self
, bus
)
5333 def __enter__(self
):
5334 gobject
.timeout_add(1, self
.run_connect
)
5335 gobject
.timeout_add(15000, self
.timeout
)
5336 self
.add_signal(self
.propertiesChanged
, WPAS_DBUS_IFACE
,
5337 "PropertiesChanged")
5338 self
.add_signal(self
.eap
, WPAS_DBUS_IFACE
, "EAP")
5342 def propertiesChanged(self
, properties
):
5343 logger
.debug("propertiesChanged: %s" % str(properties
))
5344 if 'State' in properties
and properties
['State'] == "completed":
5348 def eap(self
, status
, parameter
):
5349 logger
.debug("EAP: status=%s parameter=%s" % (status
, parameter
))
5351 def run_connect(self
, *args
):
5352 logger
.debug("run_connect")
5353 args
= dbus
.Dictionary({ 'ssid': ssid
,
5354 'key_mgmt': 'WPA-EAP',
5357 'password': 'password',
5358 'ca_cert': 'auth_serv/ca.pem',
5359 'phase2': 'auth=MSCHAPV2',
5360 'scan_freq': 2412 },
5362 self
.netw
= iface
.AddNetwork(args
)
5363 iface
.SelectNetwork(self
.netw
)
5369 with
TestDbusConnect(bus
) as t
:
5371 raise Exception("Expected signals not seen")
5373 def test_dbus_ap_scan_2_ap_mode_scan(dev
, apdev
):
5374 """AP_SCAN 2 AP mode and D-Bus Scan()"""
5376 _test_dbus_ap_scan_2_ap_mode_scan(dev
, apdev
)
5378 dev
[0].request("AP_SCAN 1")
5380 def _test_dbus_ap_scan_2_ap_mode_scan(dev
, apdev
):
5381 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
5382 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
5384 if "OK" not in dev
[0].request("AP_SCAN 2"):
5385 raise Exception("Failed to set AP_SCAN 2")
5387 id = dev
[0].add_network()
5388 dev
[0].set_network(id, "mode", "2")
5389 dev
[0].set_network_quoted(id, "ssid", "wpas-ap-open")
5390 dev
[0].set_network(id, "key_mgmt", "NONE")
5391 dev
[0].set_network(id, "frequency", "2412")
5392 dev
[0].set_network(id, "scan_freq", "2412")
5393 dev
[0].set_network(id, "disabled", "0")
5394 dev
[0].select_network(id)
5395 ev
= dev
[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout
=5)
5397 raise Exception("AP failed to start")
5399 with
fail_test(dev
[0], 1, "wpa_driver_nl80211_scan"):
5400 iface
.Scan({'Type': 'active',
5402 'Channels': [(dbus
.UInt32(2412), dbus
.UInt32(20))]})
5403 ev
= dev
[0].wait_event(["CTRL-EVENT-SCAN-FAILED",
5404 "AP-DISABLED"], timeout
=5)
5406 raise Exception("CTRL-EVENT-SCAN-FAILED not seen")
5407 if "AP-DISABLED" in ev
:
5408 raise Exception("Unexpected AP-DISABLED event")
5410 # Wait for the retry to scan happen
5411 ev
= dev
[0].wait_event(["CTRL-EVENT-SCAN-FAILED",
5412 "AP-DISABLED"], timeout
=5)
5414 raise Exception("CTRL-EVENT-SCAN-FAILED not seen - retry")
5415 if "AP-DISABLED" in ev
:
5416 raise Exception("Unexpected AP-DISABLED event - retry")
5418 dev
[1].connect("wpas-ap-open", key_mgmt
="NONE", scan_freq
="2412")
5419 dev
[1].request("DISCONNECT")
5420 dev
[1].wait_disconnected()
5421 dev
[0].request("DISCONNECT")
5422 dev
[0].wait_disconnected()
5424 def test_dbus_expectdisconnect(dev
, apdev
):
5425 """D-Bus ExpectDisconnect"""
5426 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
5427 wpas
= dbus
.Interface(wpas_obj
, WPAS_DBUS_SERVICE
)
5429 params
= { "ssid": "test-open" }
5430 hapd
= hostapd
.add_ap(apdev
[0], params
)
5431 dev
[0].connect("test-open", key_mgmt
="NONE", scan_freq
="2412")
5433 # This does not really verify the behavior other than by going through the
5434 # code path for additional coverage.
5435 wpas
.ExpectDisconnect()
5436 dev
[0].request("DISCONNECT")
5437 dev
[0].wait_disconnected()
5439 def test_dbus_save_config(dev
, apdev
):
5440 """D-Bus SaveConfig"""
5441 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
5442 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
5445 raise Exception("SaveConfig() accepted unexpectedly")
5446 except dbus
.exceptions
.DBusException
, e
:
5447 if not str(e
).startswith("fi.w1.wpa_supplicant1.UnknownError: Not allowed to update configuration"):
5448 raise Exception("Unexpected error message for SaveConfig(): " + str(e
))
5450 def test_dbus_vendor_elem(dev
, apdev
):
5451 """D-Bus vendor element operations"""
5453 _test_dbus_vendor_elem(dev
, apdev
)
5455 dev
[0].request("VENDOR_ELEM_REMOVE 1 *")
5457 def _test_dbus_vendor_elem(dev
, apdev
):
5458 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
5459 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
5461 dev
[0].request("VENDOR_ELEM_REMOVE 1 *")
5464 ie
= dbus
.ByteArray("\x00\x00")
5465 iface
.VendorElemAdd(-1, ie
)
5466 raise Exception("Invalid VendorElemAdd() accepted")
5467 except dbus
.exceptions
.DBusException
, e
:
5468 if "InvalidArgs" not in str(e
) or "Invalid ID" not in str(e
):
5469 raise Exception("Unexpected error message for invalid VendorElemAdd[1]: " + str(e
))
5472 ie
= dbus
.ByteArray("")
5473 iface
.VendorElemAdd(1, ie
)
5474 raise Exception("Invalid VendorElemAdd() accepted")
5475 except dbus
.exceptions
.DBusException
, e
:
5476 if "InvalidArgs" not in str(e
) or "Invalid value" not in str(e
):
5477 raise Exception("Unexpected error message for invalid VendorElemAdd[2]: " + str(e
))
5480 ie
= dbus
.ByteArray("\x00\x01")
5481 iface
.VendorElemAdd(1, ie
)
5482 raise Exception("Invalid VendorElemAdd() accepted")
5483 except dbus
.exceptions
.DBusException
, e
:
5484 if "InvalidArgs" not in str(e
) or "Parse error" not in str(e
):
5485 raise Exception("Unexpected error message for invalid VendorElemAdd[3]: " + str(e
))
5488 iface
.VendorElemGet(-1)
5489 raise Exception("Invalid VendorElemGet() accepted")
5490 except dbus
.exceptions
.DBusException
, e
:
5491 if "InvalidArgs" not in str(e
) or "Invalid ID" not in str(e
):
5492 raise Exception("Unexpected error message for invalid VendorElemGet[1]: " + str(e
))
5495 iface
.VendorElemGet(1)
5496 raise Exception("Invalid VendorElemGet() accepted")
5497 except dbus
.exceptions
.DBusException
, e
:
5498 if "InvalidArgs" not in str(e
) or "ID value does not exist" not in str(e
):
5499 raise Exception("Unexpected error message for invalid VendorElemGet[2]: " + str(e
))
5502 ie
= dbus
.ByteArray("\x00\x00")
5503 iface
.VendorElemRem(-1, ie
)
5504 raise Exception("Invalid VendorElemRemove() accepted")
5505 except dbus
.exceptions
.DBusException
, e
:
5506 if "InvalidArgs" not in str(e
) or "Invalid ID" not in str(e
):
5507 raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e
))
5510 ie
= dbus
.ByteArray("")
5511 iface
.VendorElemRem(1, ie
)
5512 raise Exception("Invalid VendorElemRemove() accepted")
5513 except dbus
.exceptions
.DBusException
, e
:
5514 if "InvalidArgs" not in str(e
) or "Invalid value" not in str(e
):
5515 raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e
))
5517 iface
.VendorElemRem(1, "*")
5519 ie
= dbus
.ByteArray("\x00\x01\x00")
5520 iface
.VendorElemAdd(1, ie
)
5522 val
= iface
.VendorElemGet(1)
5523 if len(val
) != len(ie
):
5524 raise Exception("Unexpected VendorElemGet length")
5525 for i
in range(len(val
)):
5526 if val
[i
] != dbus
.Byte(ie
[i
]):
5527 raise Exception("Unexpected VendorElemGet data")
5529 ie2
= dbus
.ByteArray("\xe0\x00")
5530 iface
.VendorElemAdd(1, ie2
)
5533 val
= iface
.VendorElemGet(1)
5534 if len(val
) != len(ies
):
5535 raise Exception("Unexpected VendorElemGet length[2]")
5536 for i
in range(len(val
)):
5537 if val
[i
] != dbus
.Byte(ies
[i
]):
5538 raise Exception("Unexpected VendorElemGet data[2]")
5541 test_ie
= dbus
.ByteArray("\x01\x01")
5542 iface
.VendorElemRem(1, test_ie
)
5543 raise Exception("Invalid VendorElemRemove() accepted")
5544 except dbus
.exceptions
.DBusException
, e
:
5545 if "InvalidArgs" not in str(e
) or "Parse error" not in str(e
):
5546 raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e
))
5548 iface
.VendorElemRem(1, ie
)
5549 val
= iface
.VendorElemGet(1)
5550 if len(val
) != len(ie2
):
5551 raise Exception("Unexpected VendorElemGet length[3]")
5553 iface
.VendorElemRem(1, "*")
5555 iface
.VendorElemGet(1)
5556 raise Exception("Invalid VendorElemGet() accepted after removal")
5557 except dbus
.exceptions
.DBusException
, e
:
5558 if "InvalidArgs" not in str(e
) or "ID value does not exist" not in str(e
):
5559 raise Exception("Unexpected error message for invalid VendorElemGet after removal: " + str(e
))
5561 def test_dbus_assoc_reject(dev
, apdev
):
5562 """D-Bus AssocStatusCode"""
5563 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
5564 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
5567 params
= { "ssid": ssid
,
5568 "max_listen_interval": "1" }
5569 hapd
= hostapd
.add_ap(apdev
[0], params
)
5571 class TestDbusConnect(TestDbus
):
5572 def __init__(self
, bus
):
5573 TestDbus
.__init
__(self
, bus
)
5574 self
.assoc_status_seen
= False
5577 def __enter__(self
):
5578 gobject
.timeout_add(1, self
.run_connect
)
5579 gobject
.timeout_add(15000, self
.timeout
)
5580 self
.add_signal(self
.propertiesChanged
, WPAS_DBUS_IFACE
,
5581 "PropertiesChanged")
5585 def propertiesChanged(self
, properties
):
5586 logger
.debug("propertiesChanged: %s" % str(properties
))
5587 if 'AssocStatusCode' in properties
:
5588 status
= properties
['AssocStatusCode']
5590 logger
.info("Unexpected status code: " + str(status
))
5592 self
.assoc_status_seen
= True
5596 def run_connect(self
, *args
):
5597 args
= dbus
.Dictionary({ 'ssid': ssid
,
5599 'scan_freq': 2412 },
5601 self
.netw
= iface
.AddNetwork(args
)
5602 iface
.SelectNetwork(self
.netw
)
5606 return self
.assoc_status_seen
5608 with
TestDbusConnect(bus
) as t
:
5610 raise Exception("Expected signals not seen")
5612 def test_dbus_mesh(dev
, apdev
):
5614 check_mesh_support(dev
[0])
5615 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
5616 mesh
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_MESH
)
5618 add_open_mesh_network(dev
[1])
5619 addr1
= dev
[1].own_addr()
5621 class TestDbusMesh(TestDbus
):
5622 def __init__(self
, bus
):
5623 TestDbus
.__init
__(self
, bus
)
5626 def __enter__(self
):
5627 gobject
.timeout_add(1, self
.run_test
)
5628 gobject
.timeout_add(15000, self
.timeout
)
5629 self
.add_signal(self
.meshGroupStarted
, WPAS_DBUS_IFACE_MESH
,
5631 self
.add_signal(self
.meshGroupRemoved
, WPAS_DBUS_IFACE_MESH
,
5633 self
.add_signal(self
.meshPeerConnected
, WPAS_DBUS_IFACE_MESH
,
5634 "MeshPeerConnected")
5635 self
.add_signal(self
.meshPeerDisconnected
, WPAS_DBUS_IFACE_MESH
,
5636 "MeshPeerDisconnected")
5640 def meshGroupStarted(self
, args
):
5641 logger
.debug("MeshGroupStarted: " + str(args
))
5643 def meshGroupRemoved(self
, args
):
5644 logger
.debug("MeshGroupRemoved: " + str(args
))
5648 def meshPeerConnected(self
, args
):
5649 logger
.debug("MeshPeerConnected: " + str(args
))
5651 res
= if_obj
.Get(WPAS_DBUS_IFACE_MESH
, 'MeshPeers',
5652 dbus_interface
=dbus
.PROPERTIES_IFACE
,
5654 logger
.debug("MeshPeers: " + str(res
))
5656 raise Exception("Unexpected number of MeshPeer values")
5657 if binascii
.hexlify(res
[0]) != addr1
.replace(':', ''):
5658 raise Exception("Unexpected peer address")
5660 res
= if_obj
.Get(WPAS_DBUS_IFACE_MESH
, 'MeshGroup',
5661 dbus_interface
=dbus
.PROPERTIES_IFACE
,
5663 logger
.debug("MeshGroup: " + str(res
))
5664 if res
!= "wpas-mesh-open":
5665 raise Exception("Unexpected MeshGroup")
5666 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
5667 dev1
.mesh_group_remove()
5669 def meshPeerDisconnected(self
, args
):
5670 logger
.debug("MeshPeerDisconnected: " + str(args
))
5671 dev0
= WpaSupplicant('wlan0', '/tmp/wpas-wlan0')
5672 dev0
.mesh_group_remove()
5674 def run_test(self
, *args
):
5675 logger
.debug("run_test")
5676 dev0
= WpaSupplicant('wlan0', '/tmp/wpas-wlan0')
5677 add_open_mesh_network(dev0
)
5683 with
TestDbusMesh(bus
) as t
:
5685 raise Exception("Expected signals not seen")