]>
git.ipfire.org Git - thirdparty/hostap.git/blob - tests/hwsim/test_ap_wps.py
2 # Copyright (c) 2013-2014, Jouni Malinen <j@w1.fi>
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
11 logger
= logging
.getLogger()
17 import xml
.etree
.ElementTree
as ET
23 def test_ap_wps_init(dev
, apdev
):
24 """Initial AP configuration with first WPS Enrollee"""
26 hostapd
.add_ap(apdev
[0]['ifname'],
27 { "ssid": ssid
, "eap_server": "1", "wps_state": "1" })
28 hapd
= hostapd
.Hostapd(apdev
[0]['ifname'])
29 logger
.info("WPS provisioning step")
30 hapd
.request("WPS_PBC")
31 if "PBC Status: Active" not in hapd
.request("WPS_GET_STATUS"):
32 raise Exception("PBC status not shown correctly")
34 id = dev
[0].add_network()
35 dev
[0].set_network_quoted(id, "ssid", "home")
36 dev
[0].set_network_quoted(id, "psk", "12345678")
37 dev
[0].request("ENABLE_NETWORK %s no-connect" % id)
39 id = dev
[0].add_network()
40 dev
[0].set_network_quoted(id, "ssid", "home2")
41 dev
[0].set_network(id, "bssid", "00:11:22:33:44:55")
42 dev
[0].set_network(id, "key_mgmt", "NONE")
43 dev
[0].request("ENABLE_NETWORK %s no-connect" % id)
45 dev
[0].request("WPS_PBC")
46 ev
= dev
[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout
=30)
48 raise Exception("Association with the AP timed out")
49 status
= dev
[0].get_status()
50 if status
['wpa_state'] != 'COMPLETED' or status
['bssid'] != apdev
[0]['bssid']:
51 raise Exception("Not fully connected")
52 if status
['ssid'] != ssid
:
53 raise Exception("Unexpected SSID")
54 if status
['pairwise_cipher'] != 'CCMP':
55 raise Exception("Unexpected encryption configuration")
56 if status
['key_mgmt'] != 'WPA2-PSK':
57 raise Exception("Unexpected key_mgmt")
59 status
= hapd
.request("WPS_GET_STATUS")
60 if "PBC Status: Disabled" not in status
:
61 raise Exception("PBC status not shown correctly")
62 if "Last WPS result: Success" not in status
:
63 raise Exception("Last WPS result not shown correctly")
64 if "Peer Address: " + dev
[0].p2p_interface_addr() not in status
:
65 raise Exception("Peer address not shown correctly")
66 conf
= hapd
.request("GET_CONFIG")
67 if "wps_state=configured" not in conf
:
68 raise Exception("AP not in WPS configured state")
69 if "rsn_pairwise_cipher=CCMP TKIP" not in conf
:
70 raise Exception("Unexpected rsn_pairwise_cipher")
71 if "wpa_pairwise_cipher=CCMP TKIP" not in conf
:
72 raise Exception("Unexpected wpa_pairwise_cipher")
73 if "group_cipher=TKIP" not in conf
:
74 raise Exception("Unexpected group_cipher")
76 if len(dev
[0].list_networks()) != 3:
77 raise Exception("Unexpected number of network blocks")
79 def test_ap_wps_init_2ap_pbc(dev
, apdev
):
80 """Initial two-radio AP configuration with first WPS PBC Enrollee"""
82 params
= { "ssid": ssid
, "eap_server": "1", "wps_state": "1" }
83 hostapd
.add_ap(apdev
[0]['ifname'], params
)
84 hostapd
.add_ap(apdev
[1]['ifname'], params
)
85 hapd
= hostapd
.Hostapd(apdev
[0]['ifname'])
86 logger
.info("WPS provisioning step")
87 hapd
.request("WPS_PBC")
88 dev
[0].scan(freq
="2412")
89 bss
= dev
[0].get_bss(apdev
[0]['bssid'])
90 if "[WPS-PBC]" not in bss
['flags']:
91 raise Exception("WPS-PBC flag missing from AP1")
92 bss
= dev
[0].get_bss(apdev
[1]['bssid'])
93 if "[WPS-PBC]" not in bss
['flags']:
94 raise Exception("WPS-PBC flag missing from AP2")
96 dev
[0].request("SET wps_cred_processing 2")
97 dev
[0].request("WPS_PBC")
98 ev
= dev
[0].wait_event(["WPS-CRED-RECEIVED"], timeout
=30)
99 dev
[0].request("SET wps_cred_processing 0")
101 raise Exception("WPS cred event not seen")
103 raise Exception("WPS attributes not included in the cred event")
104 ev
= dev
[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout
=30)
106 raise Exception("Association with the AP timed out")
108 dev
[1].scan(freq
="2412")
109 bss
= dev
[1].get_bss(apdev
[0]['bssid'])
110 if "[WPS-PBC]" in bss
['flags']:
111 raise Exception("WPS-PBC flag not cleared from AP1")
112 bss
= dev
[1].get_bss(apdev
[1]['bssid'])
113 if "[WPS-PBC]" in bss
['flags']:
114 raise Exception("WPS-PBC flag bit ckeared from AP2")
116 def test_ap_wps_init_2ap_pin(dev
, apdev
):
117 """Initial two-radio AP configuration with first WPS PIN Enrollee"""
119 params
= { "ssid": ssid
, "eap_server": "1", "wps_state": "1" }
120 hostapd
.add_ap(apdev
[0]['ifname'], params
)
121 hostapd
.add_ap(apdev
[1]['ifname'], params
)
122 hapd
= hostapd
.Hostapd(apdev
[0]['ifname'])
123 logger
.info("WPS provisioning step")
124 pin
= dev
[0].wps_read_pin()
125 hapd
.request("WPS_PIN any " + pin
)
126 dev
[0].scan(freq
="2412")
127 bss
= dev
[0].get_bss(apdev
[0]['bssid'])
128 if "[WPS-AUTH]" not in bss
['flags']:
129 raise Exception("WPS-AUTH flag missing from AP1")
130 bss
= dev
[0].get_bss(apdev
[1]['bssid'])
131 if "[WPS-AUTH]" not in bss
['flags']:
132 raise Exception("WPS-AUTH flag missing from AP2")
133 dev
[0].dump_monitor()
134 dev
[0].request("WPS_PIN any " + pin
)
135 ev
= dev
[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout
=30)
137 raise Exception("Association with the AP timed out")
139 dev
[1].scan(freq
="2412")
140 bss
= dev
[1].get_bss(apdev
[0]['bssid'])
141 if "[WPS-AUTH]" in bss
['flags']:
142 raise Exception("WPS-AUTH flag not cleared from AP1")
143 bss
= dev
[1].get_bss(apdev
[1]['bssid'])
144 if "[WPS-AUTH]" in bss
['flags']:
145 raise Exception("WPS-AUTH flag bit ckeared from AP2")
147 def test_ap_wps_init_through_wps_config(dev
, apdev
):
148 """Initial AP configuration using wps_config command"""
149 ssid
= "test-wps-init-config"
150 hostapd
.add_ap(apdev
[0]['ifname'],
151 { "ssid": ssid
, "eap_server": "1", "wps_state": "1" })
152 hapd
= hostapd
.Hostapd(apdev
[0]['ifname'])
153 if "FAIL" in hapd
.request("WPS_CONFIG " + ssid
.encode("hex") + " WPA2PSK CCMP " + "12345678".encode("hex")):
154 raise Exception("WPS_CONFIG command failed")
155 ev
= hapd
.wait_event(["WPS-NEW-AP-SETTINGS"], timeout
=5)
157 raise Exception("Timeout on WPS-NEW-AP-SETTINGS events")
158 # It takes some time for the AP to update Beacon and Probe Response frames,
159 # so wait here before requesting the scan to be started to avoid adding
160 # extra five second wait to the test due to fetching obsolete scan results.
163 dev
[0].connect(ssid
, psk
="12345678", scan_freq
="2412", proto
="WPA2",
164 pairwise
="CCMP", group
="CCMP")
166 def test_ap_wps_conf(dev
, apdev
):
167 """WPS PBC provisioning with configured AP"""
168 ssid
= "test-wps-conf"
169 hostapd
.add_ap(apdev
[0]['ifname'],
170 { "ssid": ssid
, "eap_server": "1", "wps_state": "2",
171 "wpa_passphrase": "12345678", "wpa": "2",
172 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
173 hapd
= hostapd
.Hostapd(apdev
[0]['ifname'])
174 logger
.info("WPS provisioning step")
175 hapd
.request("WPS_PBC")
176 dev
[0].dump_monitor()
177 dev
[0].request("WPS_PBC")
178 ev
= dev
[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout
=30)
180 raise Exception("Association with the AP timed out")
181 status
= dev
[0].get_status()
182 if status
['wpa_state'] != 'COMPLETED':
183 raise Exception("Not fully connected")
184 if status
['bssid'] != apdev
[0]['bssid']:
185 raise Exception("Unexpected BSSID")
186 if status
['ssid'] != ssid
:
187 raise Exception("Unexpected SSID")
188 if status
['pairwise_cipher'] != 'CCMP' or status
['group_cipher'] != 'CCMP':
189 raise Exception("Unexpected encryption configuration")
190 if status
['key_mgmt'] != 'WPA2-PSK':
191 raise Exception("Unexpected key_mgmt")
193 sta
= hapd
.get_sta(dev
[0].p2p_interface_addr())
194 if 'wpsDeviceName' not in sta
or sta
['wpsDeviceName'] != "Device A":
195 raise Exception("Device name not available in STA command")
197 def test_ap_wps_conf_5ghz(dev
, apdev
):
198 """WPS PBC provisioning with configured AP on 5 GHz band"""
200 ssid
= "test-wps-conf"
201 params
= { "ssid": ssid
, "eap_server": "1", "wps_state": "2",
202 "wpa_passphrase": "12345678", "wpa": "2",
203 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
204 "country_code": "FI", "hw_mode": "a", "channel": "36" }
205 hapd
= hostapd
.add_ap(apdev
[0]['ifname'], params
)
206 logger
.info("WPS provisioning step")
207 hapd
.request("WPS_PBC")
208 dev
[0].request("WPS_PBC")
209 ev
= dev
[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout
=30)
211 raise Exception("Association with the AP timed out")
213 sta
= hapd
.get_sta(dev
[0].p2p_interface_addr())
214 if 'wpsDeviceName' not in sta
or sta
['wpsDeviceName'] != "Device A":
215 raise Exception("Device name not available in STA command")
217 subprocess
.call(['sudo', 'iw', 'reg', 'set', '00'])
219 def test_ap_wps_conf_chan14(dev
, apdev
):
220 """WPS PBC provisioning with configured AP on channel 14"""
222 ssid
= "test-wps-conf"
223 params
= { "ssid": ssid
, "eap_server": "1", "wps_state": "2",
224 "wpa_passphrase": "12345678", "wpa": "2",
225 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
226 "country_code": "JP", "hw_mode": "b", "channel": "14" }
227 hapd
= hostapd
.add_ap(apdev
[0]['ifname'], params
)
228 logger
.info("WPS provisioning step")
229 hapd
.request("WPS_PBC")
230 dev
[0].request("WPS_PBC")
231 ev
= dev
[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout
=30)
233 raise Exception("Association with the AP timed out")
235 sta
= hapd
.get_sta(dev
[0].p2p_interface_addr())
236 if 'wpsDeviceName' not in sta
or sta
['wpsDeviceName'] != "Device A":
237 raise Exception("Device name not available in STA command")
239 subprocess
.call(['sudo', 'iw', 'reg', 'set', '00'])
241 def test_ap_wps_twice(dev
, apdev
):
242 """WPS provisioning with twice to change passphrase"""
243 ssid
= "test-wps-twice"
244 params
= { "ssid": ssid
, "eap_server": "1", "wps_state": "2",
245 "wpa_passphrase": "12345678", "wpa": "2",
246 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP" }
247 hostapd
.add_ap(apdev
[0]['ifname'], params
)
248 hapd
= hostapd
.Hostapd(apdev
[0]['ifname'])
249 logger
.info("WPS provisioning step")
250 hapd
.request("WPS_PBC")
251 dev
[0].dump_monitor()
252 dev
[0].request("WPS_PBC")
253 ev
= dev
[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout
=30)
255 raise Exception("Association with the AP timed out")
256 dev
[0].request("DISCONNECT")
258 logger
.info("Restart AP with different passphrase and re-run WPS")
259 hapd_global
= hostapd
.HostapdGlobal()
260 hapd_global
.remove(apdev
[0]['ifname'])
261 params
['wpa_passphrase'] = 'another passphrase'
262 hostapd
.add_ap(apdev
[0]['ifname'], params
)
263 hapd
= hostapd
.Hostapd(apdev
[0]['ifname'])
264 logger
.info("WPS provisioning step")
265 hapd
.request("WPS_PBC")
266 dev
[0].dump_monitor()
267 dev
[0].request("WPS_PBC any")
268 ev
= dev
[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout
=30)
270 raise Exception("Association with the AP timed out")
271 networks
= dev
[0].list_networks()
272 if len(networks
) > 1:
273 raise Exception("Unexpected duplicated network block present")
275 def test_ap_wps_incorrect_pin(dev
, apdev
):
276 """WPS PIN provisioning with incorrect PIN"""
277 ssid
= "test-wps-incorrect-pin"
278 hostapd
.add_ap(apdev
[0]['ifname'],
279 { "ssid": ssid
, "eap_server": "1", "wps_state": "2",
280 "wpa_passphrase": "12345678", "wpa": "2",
281 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
282 hapd
= hostapd
.Hostapd(apdev
[0]['ifname'])
284 logger
.info("WPS provisioning attempt 1")
285 hapd
.request("WPS_PIN any 12345670")
286 dev
[0].dump_monitor()
287 dev
[0].request("WPS_PIN any 55554444")
288 ev
= dev
[0].wait_event(["WPS-FAIL"], timeout
=30)
290 raise Exception("WPS operation timed out")
291 if "config_error=18" not in ev
:
292 raise Exception("Incorrect config_error reported")
293 if "msg=8" not in ev
:
294 raise Exception("PIN error detected on incorrect message")
295 ev
= dev
[0].wait_event(["CTRL-EVENT-DISCONNECTED"])
297 raise Exception("Timeout on disconnection event")
298 dev
[0].request("WPS_CANCEL")
299 # if a scan was in progress, wait for it to complete before trying WPS again
300 ev
= dev
[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 5)
302 status
= hapd
.request("WPS_GET_STATUS")
303 if "Last WPS result: Failed" not in status
:
304 raise Exception("WPS failure result not shown correctly")
306 logger
.info("WPS provisioning attempt 2")
307 hapd
.request("WPS_PIN any 12345670")
308 dev
[0].dump_monitor()
309 dev
[0].request("WPS_PIN any 12344444")
310 ev
= dev
[0].wait_event(["WPS-FAIL"], timeout
=30)
312 raise Exception("WPS operation timed out")
313 if "config_error=18" not in ev
:
314 raise Exception("Incorrect config_error reported")
315 if "msg=10" not in ev
:
316 raise Exception("PIN error detected on incorrect message")
317 ev
= dev
[0].wait_event(["CTRL-EVENT-DISCONNECTED"])
319 raise Exception("Timeout on disconnection event")
321 def test_ap_wps_conf_pin(dev
, apdev
):
322 """WPS PIN provisioning with configured AP"""
323 ssid
= "test-wps-conf-pin"
324 hostapd
.add_ap(apdev
[0]['ifname'],
325 { "ssid": ssid
, "eap_server": "1", "wps_state": "2",
326 "wpa_passphrase": "12345678", "wpa": "2",
327 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
328 hapd
= hostapd
.Hostapd(apdev
[0]['ifname'])
329 logger
.info("WPS provisioning step")
330 pin
= dev
[0].wps_read_pin()
331 hapd
.request("WPS_PIN any " + pin
)
332 dev
[0].dump_monitor()
333 dev
[0].request("WPS_PIN any " + pin
)
334 ev
= dev
[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout
=30)
336 raise Exception("Association with the AP timed out")
337 status
= dev
[0].get_status()
338 if status
['wpa_state'] != 'COMPLETED' or status
['bssid'] != apdev
[0]['bssid']:
339 raise Exception("Not fully connected")
340 if status
['ssid'] != ssid
:
341 raise Exception("Unexpected SSID")
342 if status
['pairwise_cipher'] != 'CCMP' or status
['group_cipher'] != 'CCMP':
343 raise Exception("Unexpected encryption configuration")
344 if status
['key_mgmt'] != 'WPA2-PSK':
345 raise Exception("Unexpected key_mgmt")
347 dev
[1].scan(freq
="2412")
348 bss
= dev
[1].get_bss(apdev
[0]['bssid'])
349 if "[WPS-AUTH]" in bss
['flags']:
350 raise Exception("WPS-AUTH flag not cleared")
351 logger
.info("Try to connect from another station using the same PIN")
352 pin
= dev
[1].request("WPS_PIN any")
353 ev
= dev
[1].wait_event(["WPS-M2D","CTRL-EVENT-CONNECTED"], timeout
=30)
355 raise Exception("Operation timed out")
356 if "WPS-M2D" not in ev
:
357 raise Exception("Unexpected WPS operation started")
358 hapd
.request("WPS_PIN any " + pin
)
359 ev
= dev
[1].wait_event(["CTRL-EVENT-CONNECTED"], timeout
=30)
361 raise Exception("Association with the AP timed out")
363 def test_ap_wps_conf_pin_2sta(dev
, apdev
):
364 """Two stations trying to use WPS PIN at the same time"""
365 ssid
= "test-wps-conf-pin2"
366 hostapd
.add_ap(apdev
[0]['ifname'],
367 { "ssid": ssid
, "eap_server": "1", "wps_state": "2",
368 "wpa_passphrase": "12345678", "wpa": "2",
369 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
370 hapd
= hostapd
.Hostapd(apdev
[0]['ifname'])
371 logger
.info("WPS provisioning step")
374 hapd
.request("WPS_PIN " + dev
[0].get_status_field("uuid") + " " + pin
)
375 hapd
.request("WPS_PIN " + dev
[1].get_status_field("uuid") + " " + pin
)
376 dev
[0].dump_monitor()
377 dev
[1].dump_monitor()
378 dev
[0].request("WPS_PIN any " + pin
)
379 dev
[1].request("WPS_PIN any " + pin
)
380 ev
= dev
[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout
=30)
382 raise Exception("Association with the AP timed out")
383 ev
= dev
[1].wait_event(["CTRL-EVENT-CONNECTED"], timeout
=30)
385 raise Exception("Association with the AP timed out")
387 def test_ap_wps_conf_pin_timeout(dev
, apdev
):
388 """WPS PIN provisioning with configured AP timing out PIN"""
389 ssid
= "test-wps-conf-pin"
390 hostapd
.add_ap(apdev
[0]['ifname'],
391 { "ssid": ssid
, "eap_server": "1", "wps_state": "2",
392 "wpa_passphrase": "12345678", "wpa": "2",
393 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
394 hapd
= hostapd
.Hostapd(apdev
[0]['ifname'])
395 addr
= dev
[0].p2p_interface_addr()
396 pin
= dev
[0].wps_read_pin()
397 if "FAIL" not in hapd
.request("WPS_PIN "):
398 raise Exception("Unexpected success on invalid WPS_PIN")
399 hapd
.request("WPS_PIN any " + pin
+ " 1")
401 dev
[0].request("WPS_PIN any " + pin
)
402 ev
= hapd
.wait_event(["WPS-PIN-NEEDED"], timeout
=20)
404 raise Exception("WPS-PIN-NEEDED event timed out")
405 ev
= dev
[0].wait_event(["WPS-M2D"])
407 raise Exception("M2D not reported")
408 dev
[0].request("WPS_CANCEL")
410 hapd
.request("WPS_PIN any " + pin
+ " 20 " + addr
)
411 dev
[0].request("WPS_PIN any " + pin
)
412 ev
= dev
[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout
=30)
414 raise Exception("Association with the AP timed out")
416 def test_ap_wps_reg_connect(dev
, apdev
):
417 """WPS registrar using AP PIN to connect"""
418 ssid
= "test-wps-reg-ap-pin"
420 hostapd
.add_ap(apdev
[0]['ifname'],
421 { "ssid": ssid
, "eap_server": "1", "wps_state": "2",
422 "wpa_passphrase": "12345678", "wpa": "2",
423 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
425 logger
.info("WPS provisioning step")
426 dev
[0].dump_monitor()
427 dev
[0].wps_reg(apdev
[0]['bssid'], appin
)
428 status
= dev
[0].get_status()
429 if status
['wpa_state'] != 'COMPLETED' or status
['bssid'] != apdev
[0]['bssid']:
430 raise Exception("Not fully connected")
431 if status
['ssid'] != ssid
:
432 raise Exception("Unexpected SSID")
433 if status
['pairwise_cipher'] != 'CCMP' or status
['group_cipher'] != 'CCMP':
434 raise Exception("Unexpected encryption configuration")
435 if status
['key_mgmt'] != 'WPA2-PSK':
436 raise Exception("Unexpected key_mgmt")
438 def check_wps_reg_failure(dev
, ap
, appin
):
439 dev
.request("WPS_REG " + ap
['bssid'] + " " + appin
)
440 ev
= dev
.wait_event(["WPS-SUCCESS", "WPS-FAIL"], timeout
=15)
442 raise Exception("WPS operation timed out")
443 if "WPS-SUCCESS" in ev
:
444 raise Exception("WPS operation succeeded unexpectedly")
445 if "config_error=15" not in ev
:
446 raise Exception("WPS setup locked state was not reported correctly")
448 def test_ap_wps_random_ap_pin(dev
, apdev
):
449 """WPS registrar using random AP PIN"""
450 ssid
= "test-wps-reg-random-ap-pin"
451 ap_uuid
= "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
452 hostapd
.add_ap(apdev
[0]['ifname'],
453 { "ssid": ssid
, "eap_server": "1", "wps_state": "2",
454 "wpa_passphrase": "12345678", "wpa": "2",
455 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
456 "device_name": "Wireless AP", "manufacturer": "Company",
457 "model_name": "WAP", "model_number": "123",
458 "serial_number": "12345", "device_type": "6-0050F204-1",
459 "os_version": "01020300",
460 "config_methods": "label push_button",
461 "uuid": ap_uuid
, "upnp_iface": "lo" })
462 hapd
= hostapd
.Hostapd(apdev
[0]['ifname'])
463 appin
= hapd
.request("WPS_AP_PIN random")
465 raise Exception("Could not generate random AP PIN")
466 if appin
not in hapd
.request("WPS_AP_PIN get"):
467 raise Exception("Could not fetch current AP PIN")
468 logger
.info("WPS provisioning step")
469 dev
[0].wps_reg(apdev
[0]['bssid'], appin
)
471 hapd
.request("WPS_AP_PIN disable")
472 logger
.info("WPS provisioning step with AP PIN disabled")
473 check_wps_reg_failure(dev
[1], apdev
[0], appin
)
475 logger
.info("WPS provisioning step with AP PIN reset")
477 hapd
.request("WPS_AP_PIN set " + appin
)
478 dev
[1].wps_reg(apdev
[0]['bssid'], appin
)
479 dev
[0].request("REMOVE_NETWORK all")
480 dev
[1].request("REMOVE_NETWORK all")
481 dev
[0].wait_event(["CTRL-EVENT-DISCONNECTED"])
482 dev
[1].wait_event(["CTRL-EVENT-DISCONNECTED"])
484 logger
.info("WPS provisioning step after AP PIN timeout")
485 hapd
.request("WPS_AP_PIN disable")
486 appin
= hapd
.request("WPS_AP_PIN random 1")
488 if "FAIL" not in hapd
.request("WPS_AP_PIN get"):
489 raise Exception("AP PIN unexpectedly still enabled")
490 check_wps_reg_failure(dev
[0], apdev
[0], appin
)
492 logger
.info("WPS provisioning step after AP PIN timeout(2)")
493 hapd
.request("WPS_AP_PIN disable")
495 hapd
.request("WPS_AP_PIN set " + appin
+ " 1")
497 if "FAIL" not in hapd
.request("WPS_AP_PIN get"):
498 raise Exception("AP PIN unexpectedly still enabled")
499 check_wps_reg_failure(dev
[1], apdev
[0], appin
)
501 def test_ap_wps_reg_config(dev
, apdev
):
502 """WPS registrar configuring an AP using AP PIN"""
503 ssid
= "test-wps-init-ap-pin"
505 hostapd
.add_ap(apdev
[0]['ifname'],
506 { "ssid": ssid
, "eap_server": "1", "wps_state": "2",
508 logger
.info("WPS configuration step")
509 dev
[0].dump_monitor()
510 new_ssid
= "wps-new-ssid"
511 new_passphrase
= "1234567890"
512 dev
[0].wps_reg(apdev
[0]['bssid'], appin
, new_ssid
, "WPA2PSK", "CCMP",
514 status
= dev
[0].get_status()
515 if status
['wpa_state'] != 'COMPLETED' or status
['bssid'] != apdev
[0]['bssid']:
516 raise Exception("Not fully connected")
517 if status
['ssid'] != new_ssid
:
518 raise Exception("Unexpected SSID")
519 if status
['pairwise_cipher'] != 'CCMP' or status
['group_cipher'] != 'CCMP':
520 raise Exception("Unexpected encryption configuration")
521 if status
['key_mgmt'] != 'WPA2-PSK':
522 raise Exception("Unexpected key_mgmt")
524 logger
.info("Re-configure back to open")
525 dev
[0].request("REMOVE_NETWORK all")
526 dev
[0].request("BSS_FLUSH 0")
527 dev
[0].request("SCAN freq=2412 only_new=1")
528 ev
= dev
[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
530 raise Exception("Scan timed out")
531 dev
[0].dump_monitor()
532 dev
[0].wps_reg(apdev
[0]['bssid'], appin
, "wps-open", "OPEN", "NONE", "")
533 status
= dev
[0].get_status()
534 if status
['wpa_state'] != 'COMPLETED' or status
['bssid'] != apdev
[0]['bssid']:
535 raise Exception("Not fully connected")
536 if status
['ssid'] != "wps-open":
537 raise Exception("Unexpected SSID")
538 if status
['key_mgmt'] != 'NONE':
539 raise Exception("Unexpected key_mgmt")
541 def test_ap_wps_reg_config_ext_processing(dev
, apdev
):
542 """WPS registrar configuring an AP with external config processing"""
543 ssid
= "test-wps-init-ap-pin"
545 params
= { "ssid": ssid
, "eap_server": "1", "wps_state": "2",
546 "wps_cred_processing": "1", "ap_pin": appin
}
547 hapd
= hostapd
.add_ap(apdev
[0]['ifname'], params
)
548 new_ssid
= "wps-new-ssid"
549 new_passphrase
= "1234567890"
550 dev
[0].wps_reg(apdev
[0]['bssid'], appin
, new_ssid
, "WPA2PSK", "CCMP",
551 new_passphrase
, no_wait
=True)
552 ev
= dev
[0].wait_event(["WPS-SUCCESS"], timeout
=15)
554 raise Exception("WPS registrar operation timed out")
555 ev
= hapd
.wait_event(["WPS-NEW-AP-SETTINGS"], timeout
=15)
557 raise Exception("WPS configuration timed out")
559 raise Exception("AP Settings missing from event")
560 hapd
.request("SET wps_cred_processing 0")
561 if "FAIL" in hapd
.request("WPS_CONFIG " + new_ssid
.encode("hex") + " WPA2PSK CCMP " + new_passphrase
.encode("hex")):
562 raise Exception("WPS_CONFIG command failed")
563 ev
= dev
[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout
=15)
565 raise Exception("Association with the AP timed out")
567 def test_ap_wps_reg_config_tkip(dev
, apdev
):
568 """WPS registrar configuring AP to use TKIP and AP upgrading to TKIP+CCMP"""
569 ssid
= "test-wps-init-ap"
571 hostapd
.add_ap(apdev
[0]['ifname'],
572 { "ssid": ssid
, "eap_server": "1", "wps_state": "1",
574 logger
.info("WPS configuration step")
575 dev
[0].request("SET wps_version_number 0x10")
576 dev
[0].dump_monitor()
577 new_ssid
= "wps-new-ssid-with-tkip"
578 new_passphrase
= "1234567890"
579 dev
[0].wps_reg(apdev
[0]['bssid'], appin
, new_ssid
, "WPAPSK", "TKIP",
581 logger
.info("Re-connect to verify WPA2 mixed mode")
582 dev
[0].request("DISCONNECT")
584 dev
[0].set_network(id, "pairwise", "CCMP")
585 dev
[0].set_network(id, "proto", "RSN")
586 dev
[0].connect_network(id)
587 status
= dev
[0].get_status()
588 if status
['wpa_state'] != 'COMPLETED' or status
['bssid'] != apdev
[0]['bssid']:
589 raise Exception("Not fully connected")
590 if status
['ssid'] != new_ssid
:
591 raise Exception("Unexpected SSID")
592 if status
['pairwise_cipher'] != 'CCMP' or status
['group_cipher'] != 'TKIP':
593 raise Exception("Unexpected encryption configuration")
594 if status
['key_mgmt'] != 'WPA2-PSK':
595 raise Exception("Unexpected key_mgmt")
597 def test_ap_wps_setup_locked(dev
, apdev
):
598 """WPS registrar locking up AP setup on AP PIN failures"""
599 ssid
= "test-wps-incorrect-ap-pin"
601 hostapd
.add_ap(apdev
[0]['ifname'],
602 { "ssid": ssid
, "eap_server": "1", "wps_state": "2",
603 "wpa_passphrase": "12345678", "wpa": "2",
604 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
606 new_ssid
= "wps-new-ssid-test"
607 new_passphrase
= "1234567890"
609 ap_setup_locked
=False
610 for pin
in ["55554444", "1234", "12345678", "00000000", "11111111"]:
611 dev
[0].dump_monitor()
612 logger
.info("Try incorrect AP PIN - attempt " + pin
)
613 dev
[0].wps_reg(apdev
[0]['bssid'], pin
, new_ssid
, "WPA2PSK",
614 "CCMP", new_passphrase
, no_wait
=True)
615 ev
= dev
[0].wait_event(["WPS-FAIL", "CTRL-EVENT-CONNECTED"])
617 raise Exception("Timeout on receiving WPS operation failure event")
618 if "CTRL-EVENT-CONNECTED" in ev
:
619 raise Exception("Unexpected connection")
620 if "config_error=15" in ev
:
621 logger
.info("AP Setup Locked")
623 elif "config_error=18" not in ev
:
624 raise Exception("config_error=18 not reported")
625 ev
= dev
[0].wait_event(["CTRL-EVENT-DISCONNECTED"])
627 raise Exception("Timeout on disconnection event")
629 if not ap_setup_locked
:
630 raise Exception("AP setup was not locked")
632 hapd
= hostapd
.Hostapd(apdev
[0]['ifname'])
633 status
= hapd
.request("WPS_GET_STATUS")
634 if "Last WPS result: Failed" not in status
:
635 raise Exception("WPS failure result not shown correctly")
636 if "Peer Address: " + dev
[0].p2p_interface_addr() not in status
:
637 raise Exception("Peer address not shown correctly")
640 dev
[0].dump_monitor()
641 logger
.info("WPS provisioning step")
642 pin
= dev
[0].wps_read_pin()
643 hapd
= hostapd
.Hostapd(apdev
[0]['ifname'])
644 hapd
.request("WPS_PIN any " + pin
)
645 dev
[0].request("WPS_PIN any " + pin
)
646 ev
= dev
[0].wait_event(["WPS-SUCCESS"], timeout
=30)
648 raise Exception("WPS success was not reported")
649 ev
= dev
[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout
=30)
651 raise Exception("Association with the AP timed out")
653 appin
= hapd
.request("WPS_AP_PIN random")
655 raise Exception("Could not generate random AP PIN")
656 ev
= hapd
.wait_event(["WPS-AP-SETUP-UNLOCKED"], timeout
=10)
658 raise Exception("Failed to unlock AP PIN")
660 def test_ap_wps_setup_locked_timeout(dev
, apdev
):
661 """WPS re-enabling AP PIN after timeout"""
662 ssid
= "test-wps-incorrect-ap-pin"
664 hostapd
.add_ap(apdev
[0]['ifname'],
665 { "ssid": ssid
, "eap_server": "1", "wps_state": "2",
666 "wpa_passphrase": "12345678", "wpa": "2",
667 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
669 new_ssid
= "wps-new-ssid-test"
670 new_passphrase
= "1234567890"
672 ap_setup_locked
=False
673 for pin
in ["55554444", "1234", "12345678", "00000000", "11111111"]:
674 dev
[0].dump_monitor()
675 logger
.info("Try incorrect AP PIN - attempt " + pin
)
676 dev
[0].wps_reg(apdev
[0]['bssid'], pin
, new_ssid
, "WPA2PSK",
677 "CCMP", new_passphrase
, no_wait
=True)
678 ev
= dev
[0].wait_event(["WPS-FAIL", "CTRL-EVENT-CONNECTED"])
680 raise Exception("Timeout on receiving WPS operation failure event")
681 if "CTRL-EVENT-CONNECTED" in ev
:
682 raise Exception("Unexpected connection")
683 if "config_error=15" in ev
:
684 logger
.info("AP Setup Locked")
687 elif "config_error=18" not in ev
:
688 raise Exception("config_error=18 not reported")
689 ev
= dev
[0].wait_event(["CTRL-EVENT-DISCONNECTED"])
691 raise Exception("Timeout on disconnection event")
693 if not ap_setup_locked
:
694 raise Exception("AP setup was not locked")
695 hapd
= hostapd
.Hostapd(apdev
[0]['ifname'])
696 ev
= hapd
.wait_event(["WPS-AP-SETUP-UNLOCKED"], timeout
=80)
698 raise Exception("AP PIN did not get unlocked on 60 second timeout")
700 def test_ap_wps_pbc_overlap_2ap(dev
, apdev
):
701 """WPS PBC session overlap with two active APs"""
702 hostapd
.add_ap(apdev
[0]['ifname'],
703 { "ssid": "wps1", "eap_server": "1", "wps_state": "2",
704 "wpa_passphrase": "12345678", "wpa": "2",
705 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
706 "wps_independent": "1"})
707 hostapd
.add_ap(apdev
[1]['ifname'],
708 { "ssid": "wps2", "eap_server": "1", "wps_state": "2",
709 "wpa_passphrase": "123456789", "wpa": "2",
710 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
711 "wps_independent": "1"})
712 hapd
= hostapd
.Hostapd(apdev
[0]['ifname'])
713 hapd
.request("WPS_PBC")
714 hapd2
= hostapd
.Hostapd(apdev
[1]['ifname'])
715 hapd2
.request("WPS_PBC")
716 logger
.info("WPS provisioning step")
717 dev
[0].dump_monitor()
718 dev
[0].request("WPS_PBC")
719 ev
= dev
[0].wait_event(["WPS-OVERLAP-DETECTED"], timeout
=15)
721 raise Exception("PBC session overlap not detected")
723 def test_ap_wps_pbc_overlap_2sta(dev
, apdev
):
724 """WPS PBC session overlap with two active STAs"""
725 ssid
= "test-wps-pbc-overlap"
726 hostapd
.add_ap(apdev
[0]['ifname'],
727 { "ssid": ssid
, "eap_server": "1", "wps_state": "2",
728 "wpa_passphrase": "12345678", "wpa": "2",
729 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
730 hapd
= hostapd
.Hostapd(apdev
[0]['ifname'])
731 logger
.info("WPS provisioning step")
732 hapd
.request("WPS_PBC")
733 dev
[0].dump_monitor()
734 dev
[1].dump_monitor()
735 dev
[0].request("WPS_PBC")
736 dev
[1].request("WPS_PBC")
737 ev
= dev
[0].wait_event(["WPS-M2D"], timeout
=15)
739 raise Exception("PBC session overlap not detected (dev0)")
740 if "config_error=12" not in ev
:
741 raise Exception("PBC session overlap not correctly reported (dev0)")
742 ev
= dev
[1].wait_event(["WPS-M2D"], timeout
=15)
744 raise Exception("PBC session overlap not detected (dev1)")
745 if "config_error=12" not in ev
:
746 raise Exception("PBC session overlap not correctly reported (dev1)")
747 hapd
.request("WPS_CANCEL")
748 ret
= hapd
.request("WPS_PBC")
749 if "FAIL" not in ret
:
750 raise Exception("PBC mode allowed to be started while PBC overlap still active")
752 def test_ap_wps_cancel(dev
, apdev
):
753 """WPS AP cancelling enabled config method"""
754 ssid
= "test-wps-ap-cancel"
755 hostapd
.add_ap(apdev
[0]['ifname'],
756 { "ssid": ssid
, "eap_server": "1", "wps_state": "2",
757 "wpa_passphrase": "12345678", "wpa": "2",
758 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP" })
759 bssid
= apdev
[0]['bssid']
760 hapd
= hostapd
.Hostapd(apdev
[0]['ifname'])
762 logger
.info("Verify PBC enable/cancel")
763 hapd
.request("WPS_PBC")
764 dev
[0].scan(freq
="2412")
765 bss
= dev
[0].get_bss(apdev
[0]['bssid'])
766 if "[WPS-PBC]" not in bss
['flags']:
767 raise Exception("WPS-PBC flag missing")
768 if "FAIL" in hapd
.request("WPS_CANCEL"):
769 raise Exception("WPS_CANCEL failed")
770 dev
[0].scan(freq
="2412")
771 bss
= dev
[0].get_bss(apdev
[0]['bssid'])
772 if "[WPS-PBC]" in bss
['flags']:
773 raise Exception("WPS-PBC flag not cleared")
775 logger
.info("Verify PIN enable/cancel")
776 hapd
.request("WPS_PIN any 12345670")
777 dev
[0].scan(freq
="2412")
778 bss
= dev
[0].get_bss(apdev
[0]['bssid'])
779 if "[WPS-AUTH]" not in bss
['flags']:
780 raise Exception("WPS-AUTH flag missing")
781 if "FAIL" in hapd
.request("WPS_CANCEL"):
782 raise Exception("WPS_CANCEL failed")
783 dev
[0].scan(freq
="2412")
784 bss
= dev
[0].get_bss(apdev
[0]['bssid'])
785 if "[WPS-AUTH]" in bss
['flags']:
786 raise Exception("WPS-AUTH flag not cleared")
788 def test_ap_wps_er_add_enrollee(dev
, apdev
):
789 """WPS ER configuring AP and adding a new enrollee using PIN"""
790 ssid
= "wps-er-add-enrollee"
792 ap_uuid
= "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
793 hostapd
.add_ap(apdev
[0]['ifname'],
794 { "ssid": ssid
, "eap_server": "1", "wps_state": "1",
795 "device_name": "Wireless AP", "manufacturer": "Company",
796 "model_name": "WAP", "model_number": "123",
797 "serial_number": "12345", "device_type": "6-0050F204-1",
798 "os_version": "01020300",
799 "config_methods": "label push_button",
800 "ap_pin": ap_pin
, "uuid": ap_uuid
, "upnp_iface": "lo"})
801 logger
.info("WPS configuration step")
802 new_passphrase
= "1234567890"
803 dev
[0].dump_monitor()
804 dev
[0].wps_reg(apdev
[0]['bssid'], ap_pin
, ssid
, "WPA2PSK", "CCMP",
806 status
= dev
[0].get_status()
807 if status
['wpa_state'] != 'COMPLETED' or status
['bssid'] != apdev
[0]['bssid']:
808 raise Exception("Not fully connected")
809 if status
['ssid'] != ssid
:
810 raise Exception("Unexpected SSID")
811 if status
['pairwise_cipher'] != 'CCMP' or status
['group_cipher'] != 'CCMP':
812 raise Exception("Unexpected encryption configuration")
813 if status
['key_mgmt'] != 'WPA2-PSK':
814 raise Exception("Unexpected key_mgmt")
816 logger
.info("Start ER")
817 dev
[0].request("WPS_ER_START ifname=lo")
818 ev
= dev
[0].wait_event(["WPS-ER-AP-ADD"], timeout
=15)
820 raise Exception("AP discovery timed out")
821 if ap_uuid
not in ev
:
822 raise Exception("Expected AP UUID not found")
824 logger
.info("Learn AP configuration through UPnP")
825 dev
[0].dump_monitor()
826 dev
[0].request("WPS_ER_LEARN " + ap_uuid
+ " " + ap_pin
)
827 ev
= dev
[0].wait_event(["WPS-ER-AP-SETTINGS"], timeout
=15)
829 raise Exception("AP learn timed out")
830 if ap_uuid
not in ev
:
831 raise Exception("Expected AP UUID not in settings")
832 if "ssid=" + ssid
not in ev
:
833 raise Exception("Expected SSID not in settings")
834 if "key=" + new_passphrase
not in ev
:
835 raise Exception("Expected passphrase not in settings")
837 logger
.info("Add Enrollee using ER")
838 pin
= dev
[1].wps_read_pin()
839 dev
[0].dump_monitor()
840 dev
[0].request("WPS_ER_PIN any " + pin
+ " " + dev
[1].p2p_interface_addr())
841 dev
[1].dump_monitor()
842 dev
[1].request("WPS_PIN any " + pin
)
843 ev
= dev
[1].wait_event(["WPS-SUCCESS"], timeout
=30)
845 raise Exception("Enrollee did not report success")
846 ev
= dev
[1].wait_event(["CTRL-EVENT-CONNECTED"], timeout
=15)
848 raise Exception("Association with the AP timed out")
849 ev
= dev
[0].wait_event(["WPS-SUCCESS"], timeout
=15)
851 raise Exception("WPS ER did not report success")
852 hwsim_utils
.test_connectivity_sta(dev
[0], dev
[1])
854 logger
.info("Add a specific Enrollee using ER")
855 pin
= dev
[2].wps_read_pin()
856 addr2
= dev
[2].p2p_interface_addr()
857 dev
[0].dump_monitor()
858 dev
[2].dump_monitor()
859 dev
[2].request("WPS_PIN any " + pin
)
860 ev
= dev
[0].wait_event(["WPS-ER-ENROLLEE-ADD"], timeout
=10)
862 raise Exception("Enrollee not seen")
864 raise Exception("Unexpected Enrollee MAC address")
865 dev
[0].request("WPS_ER_PIN " + addr2
+ " " + pin
+ " " + addr2
)
866 ev
= dev
[2].wait_event(["CTRL-EVENT-CONNECTED"], timeout
=15)
868 raise Exception("Association with the AP timed out")
869 ev
= dev
[0].wait_event(["WPS-SUCCESS"], timeout
=15)
871 raise Exception("WPS ER did not report success")
873 logger
.info("Verify registrar selection behavior")
874 dev
[0].request("WPS_ER_PIN any " + pin
+ " " + dev
[1].p2p_interface_addr())
875 dev
[1].request("DISCONNECT")
876 dev
[1].wait_event(["CTRL-EVENT-DISCONNECTED"])
877 dev
[1].scan(freq
="2412")
878 bss
= dev
[1].get_bss(apdev
[0]['bssid'])
879 if "[WPS-AUTH]" not in bss
['flags']:
880 raise Exception("WPS-AUTH flag missing")
882 logger
.info("Stop ER")
883 dev
[0].dump_monitor()
884 dev
[0].request("WPS_ER_STOP")
885 ev
= dev
[0].wait_event(["WPS-ER-AP-REMOVE"])
887 raise Exception("WPS ER unsubscription timed out")
888 # It takes some time for the UPnP UNSUBSCRIBE command to go through, so wait
889 # a bit before verifying that the scan results have change.
892 dev
[1].scan(freq
="2412")
893 bss
= dev
[1].get_bss(apdev
[0]['bssid'])
894 if "[WPS-AUTH]" in bss
['flags']:
895 raise Exception("WPS-AUTH flag not removed")
897 def test_ap_wps_er_add_enrollee_pbc(dev
, apdev
):
898 """WPS ER connected to AP and adding a new enrollee using PBC"""
899 ssid
= "wps-er-add-enrollee-pbc"
901 ap_uuid
= "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
902 hostapd
.add_ap(apdev
[0]['ifname'],
903 { "ssid": ssid
, "eap_server": "1", "wps_state": "2",
904 "wpa_passphrase": "12345678", "wpa": "2",
905 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
906 "device_name": "Wireless AP", "manufacturer": "Company",
907 "model_name": "WAP", "model_number": "123",
908 "serial_number": "12345", "device_type": "6-0050F204-1",
909 "os_version": "01020300",
910 "config_methods": "label push_button",
911 "ap_pin": ap_pin
, "uuid": ap_uuid
, "upnp_iface": "lo"})
912 logger
.info("Learn AP configuration")
913 dev
[0].dump_monitor()
914 dev
[0].wps_reg(apdev
[0]['bssid'], ap_pin
)
915 status
= dev
[0].get_status()
916 if status
['wpa_state'] != 'COMPLETED' or status
['bssid'] != apdev
[0]['bssid']:
917 raise Exception("Not fully connected")
919 logger
.info("Start ER")
920 dev
[0].request("WPS_ER_START ifname=lo")
921 ev
= dev
[0].wait_event(["WPS-ER-AP-ADD"], timeout
=15)
923 raise Exception("AP discovery timed out")
924 if ap_uuid
not in ev
:
925 raise Exception("Expected AP UUID not found")
927 logger
.info("Use learned network configuration on ER")
928 dev
[0].request("WPS_ER_SET_CONFIG " + ap_uuid
+ " 0")
930 logger
.info("Add Enrollee using ER and PBC")
931 dev
[0].dump_monitor()
932 enrollee
= dev
[1].p2p_interface_addr()
933 dev
[1].dump_monitor()
934 dev
[1].request("WPS_PBC")
936 for i
in range(0, 2):
937 ev
= dev
[0].wait_event(["WPS-ER-ENROLLEE-ADD"], timeout
=15)
939 raise Exception("Enrollee discovery timed out")
943 raise Exception("Expected Enrollee not found")
944 dev
[0].request("WPS_ER_PBC " + enrollee
)
946 ev
= dev
[1].wait_event(["WPS-SUCCESS"], timeout
=15)
948 raise Exception("Enrollee did not report success")
949 ev
= dev
[1].wait_event(["CTRL-EVENT-CONNECTED"], timeout
=15)
951 raise Exception("Association with the AP timed out")
952 ev
= dev
[0].wait_event(["WPS-SUCCESS"], timeout
=15)
954 raise Exception("WPS ER did not report success")
955 hwsim_utils
.test_connectivity_sta(dev
[0], dev
[1])
957 # verify BSSID selection of the AP instead of UUID
958 if "FAIL" in dev
[0].request("WPS_ER_SET_CONFIG " + apdev
[0]['bssid'] + " 0"):
959 raise Exception("Could not select AP based on BSSID")
961 def test_ap_wps_er_v10_add_enrollee_pin(dev
, apdev
):
962 """WPS v1.0 ER connected to AP and adding a new enrollee using PIN"""
963 ssid
= "wps-er-add-enrollee-pbc"
965 ap_uuid
= "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
966 hostapd
.add_ap(apdev
[0]['ifname'],
967 { "ssid": ssid
, "eap_server": "1", "wps_state": "2",
968 "wpa_passphrase": "12345678", "wpa": "2",
969 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
970 "device_name": "Wireless AP", "manufacturer": "Company",
971 "model_name": "WAP", "model_number": "123",
972 "serial_number": "12345", "device_type": "6-0050F204-1",
973 "os_version": "01020300",
974 "config_methods": "label push_button",
975 "ap_pin": ap_pin
, "uuid": ap_uuid
, "upnp_iface": "lo"})
976 logger
.info("Learn AP configuration")
977 dev
[0].request("SET wps_version_number 0x10")
978 dev
[0].dump_monitor()
979 dev
[0].wps_reg(apdev
[0]['bssid'], ap_pin
)
980 status
= dev
[0].get_status()
981 if status
['wpa_state'] != 'COMPLETED' or status
['bssid'] != apdev
[0]['bssid']:
982 raise Exception("Not fully connected")
984 logger
.info("Start ER")
985 dev
[0].request("WPS_ER_START ifname=lo")
986 ev
= dev
[0].wait_event(["WPS-ER-AP-ADD"], timeout
=15)
988 raise Exception("AP discovery timed out")
989 if ap_uuid
not in ev
:
990 raise Exception("Expected AP UUID not found")
992 logger
.info("Use learned network configuration on ER")
993 dev
[0].request("WPS_ER_SET_CONFIG " + ap_uuid
+ " 0")
995 logger
.info("Add Enrollee using ER and PIN")
996 enrollee
= dev
[1].p2p_interface_addr()
997 pin
= dev
[1].wps_read_pin()
998 dev
[0].dump_monitor()
999 dev
[0].request("WPS_ER_PIN any " + pin
+ " " + enrollee
)
1000 dev
[1].dump_monitor()
1001 dev
[1].request("WPS_PIN any " + pin
)
1002 ev
= dev
[1].wait_event(["CTRL-EVENT-CONNECTED"], timeout
=15)
1004 raise Exception("Association with the AP timed out")
1005 ev
= dev
[0].wait_event(["WPS-SUCCESS"], timeout
=15)
1007 raise Exception("WPS ER did not report success")
1009 def test_ap_wps_er_config_ap(dev
, apdev
):
1010 """WPS ER configuring AP over UPnP"""
1011 ssid
= "wps-er-ap-config"
1013 ap_uuid
= "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
1014 hostapd
.add_ap(apdev
[0]['ifname'],
1015 { "ssid": ssid
, "eap_server": "1", "wps_state": "2",
1016 "wpa_passphrase": "12345678", "wpa": "2",
1017 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
1018 "device_name": "Wireless AP", "manufacturer": "Company",
1019 "model_name": "WAP", "model_number": "123",
1020 "serial_number": "12345", "device_type": "6-0050F204-1",
1021 "os_version": "01020300",
1022 "config_methods": "label push_button",
1023 "ap_pin": ap_pin
, "uuid": ap_uuid
, "upnp_iface": "lo"})
1025 logger
.info("Connect ER to the AP")
1026 dev
[0].connect(ssid
, psk
="12345678", scan_freq
="2412")
1028 logger
.info("WPS configuration step")
1029 dev
[0].request("WPS_ER_START ifname=lo")
1030 ev
= dev
[0].wait_event(["WPS-ER-AP-ADD"], timeout
=15)
1032 raise Exception("AP discovery timed out")
1033 if ap_uuid
not in ev
:
1034 raise Exception("Expected AP UUID not found")
1035 new_passphrase
= "1234567890"
1036 dev
[0].request("WPS_ER_CONFIG " + apdev
[0]['bssid'] + " " + ap_pin
+ " " +
1037 ssid
.encode("hex") + " WPA2PSK CCMP " +
1038 new_passphrase
.encode("hex"))
1039 ev
= dev
[0].wait_event(["WPS-SUCCESS"])
1041 raise Exception("WPS ER configuration operation timed out")
1042 dev
[1].wait_event(["CTRL-EVENT-DISCONNECTED"])
1043 dev
[0].connect(ssid
, psk
="1234567890", scan_freq
="2412")
1045 def test_ap_wps_fragmentation(dev
, apdev
):
1046 """WPS with fragmentation in EAP-WSC and mixed mode WPA+WPA2"""
1047 ssid
= "test-wps-fragmentation"
1049 hostapd
.add_ap(apdev
[0]['ifname'],
1050 { "ssid": ssid
, "eap_server": "1", "wps_state": "2",
1051 "wpa_passphrase": "12345678", "wpa": "3",
1052 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
1053 "wpa_pairwise": "TKIP", "ap_pin": appin
,
1054 "fragment_size": "50" })
1055 hapd
= hostapd
.Hostapd(apdev
[0]['ifname'])
1056 logger
.info("WPS provisioning step (PBC)")
1057 hapd
.request("WPS_PBC")
1058 dev
[0].dump_monitor()
1059 dev
[0].request("SET wps_fragment_size 50")
1060 dev
[0].request("WPS_PBC")
1061 ev
= dev
[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout
=30)
1063 raise Exception("Association with the AP timed out")
1064 status
= dev
[0].get_status()
1065 if status
['wpa_state'] != 'COMPLETED':
1066 raise Exception("Not fully connected")
1067 if status
['pairwise_cipher'] != 'CCMP' or status
['group_cipher'] != 'TKIP':
1068 raise Exception("Unexpected encryption configuration")
1069 if status
['key_mgmt'] != 'WPA2-PSK':
1070 raise Exception("Unexpected key_mgmt")
1072 logger
.info("WPS provisioning step (PIN)")
1073 pin
= dev
[1].wps_read_pin()
1074 hapd
.request("WPS_PIN any " + pin
)
1075 dev
[1].request("SET wps_fragment_size 50")
1076 dev
[1].request("WPS_PIN any " + pin
)
1077 ev
= dev
[1].wait_event(["CTRL-EVENT-CONNECTED"], timeout
=30)
1079 raise Exception("Association with the AP timed out")
1080 status
= dev
[1].get_status()
1081 if status
['wpa_state'] != 'COMPLETED':
1082 raise Exception("Not fully connected")
1083 if status
['pairwise_cipher'] != 'CCMP' or status
['group_cipher'] != 'TKIP':
1084 raise Exception("Unexpected encryption configuration")
1085 if status
['key_mgmt'] != 'WPA2-PSK':
1086 raise Exception("Unexpected key_mgmt")
1088 logger
.info("WPS connection as registrar")
1089 dev
[2].request("SET wps_fragment_size 50")
1090 dev
[2].wps_reg(apdev
[0]['bssid'], appin
)
1091 status
= dev
[2].get_status()
1092 if status
['wpa_state'] != 'COMPLETED':
1093 raise Exception("Not fully connected")
1094 if status
['pairwise_cipher'] != 'CCMP' or status
['group_cipher'] != 'TKIP':
1095 raise Exception("Unexpected encryption configuration")
1096 if status
['key_mgmt'] != 'WPA2-PSK':
1097 raise Exception("Unexpected key_mgmt")
1099 def test_ap_wps_new_version_sta(dev
, apdev
):
1100 """WPS compatibility with new version number on the station"""
1101 ssid
= "test-wps-ver"
1102 hostapd
.add_ap(apdev
[0]['ifname'],
1103 { "ssid": ssid
, "eap_server": "1", "wps_state": "2",
1104 "wpa_passphrase": "12345678", "wpa": "2",
1105 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP" })
1106 hapd
= hostapd
.Hostapd(apdev
[0]['ifname'])
1107 logger
.info("WPS provisioning step")
1108 hapd
.request("WPS_PBC")
1109 dev
[0].dump_monitor()
1110 dev
[0].request("SET wps_version_number 0x43")
1111 dev
[0].request("SET wps_vendor_ext_m1 000137100100020001")
1112 dev
[0].request("WPS_PBC")
1113 ev
= dev
[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout
=30)
1115 raise Exception("Association with the AP timed out")
1117 def test_ap_wps_new_version_ap(dev
, apdev
):
1118 """WPS compatibility with new version number on the AP"""
1119 ssid
= "test-wps-ver"
1120 hostapd
.add_ap(apdev
[0]['ifname'],
1121 { "ssid": ssid
, "eap_server": "1", "wps_state": "2",
1122 "wpa_passphrase": "12345678", "wpa": "2",
1123 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP" })
1124 hapd
= hostapd
.Hostapd(apdev
[0]['ifname'])
1125 logger
.info("WPS provisioning step")
1126 if "FAIL" in hapd
.request("SET wps_version_number 0x43"):
1127 raise Exception("Failed to enable test functionality")
1128 hapd
.request("WPS_PBC")
1129 dev
[0].dump_monitor()
1130 dev
[0].request("WPS_PBC")
1131 ev
= dev
[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout
=30)
1132 hapd
.request("SET wps_version_number 0x20")
1134 raise Exception("Association with the AP timed out")
1136 def test_ap_wps_check_pin(dev
, apdev
):
1137 """Verify PIN checking through control interface"""
1138 hostapd
.add_ap(apdev
[0]['ifname'],
1139 { "ssid": "wps", "eap_server": "1", "wps_state": "2",
1140 "wpa_passphrase": "12345678", "wpa": "2",
1141 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP" })
1142 hapd
= hostapd
.Hostapd(apdev
[0]['ifname'])
1143 for t
in [ ("12345670", "12345670"),
1144 ("12345678", "FAIL-CHECKSUM"),
1146 ("123456789", "FAIL"),
1147 ("1234-5670", "12345670"),
1148 ("1234 5670", "12345670"),
1149 ("1-2.3:4 5670", "12345670") ]:
1150 res
= hapd
.request("WPS_CHECK_PIN " + t
[0]).rstrip('\n')
1151 res2
= dev
[0].request("WPS_CHECK_PIN " + t
[0]).rstrip('\n')
1153 raise Exception("Unexpected difference in WPS_CHECK_PIN responses")
1155 raise Exception("Incorrect WPS_CHECK_PIN response {} (expected {})".format(res
, t
[1]))
1157 if "FAIL" not in hapd
.request("WPS_CHECK_PIN 12345"):
1158 raise Exception("Unexpected WPS_CHECK_PIN success")
1159 if "FAIL" not in hapd
.request("WPS_CHECK_PIN 123456789"):
1160 raise Exception("Unexpected WPS_CHECK_PIN success")
1162 for i
in range(0, 10):
1163 pin
= dev
[0].request("WPS_PIN get")
1164 rpin
= dev
[0].request("WPS_CHECK_PIN " + pin
).rstrip('\n')
1166 raise Exception("Random PIN validation failed for " + pin
)
1168 def test_ap_wps_wep_config(dev
, apdev
):
1169 """WPS 2.0 AP rejecting WEP configuration"""
1170 ssid
= "test-wps-config"
1172 hostapd
.add_ap(apdev
[0]['ifname'],
1173 { "ssid": ssid
, "eap_server": "1", "wps_state": "2",
1175 hapd
= hostapd
.Hostapd(apdev
[0]['ifname'])
1176 dev
[0].wps_reg(apdev
[0]['bssid'], appin
, "wps-new-ssid-wep", "OPEN", "WEP",
1177 "hello", no_wait
=True)
1178 ev
= hapd
.wait_event(["WPS-FAIL"], timeout
=15)
1180 raise Exception("WPS-FAIL timed out")
1181 if "reason=2" not in ev
:
1182 raise Exception("Unexpected reason code in WPS-FAIL")
1183 status
= hapd
.request("WPS_GET_STATUS")
1184 if "Last WPS result: Failed" not in status
:
1185 raise Exception("WPS failure result not shown correctly")
1186 if "Failure Reason: WEP Prohibited" not in status
:
1187 raise Exception("Failure reason not reported correctly")
1188 if "Peer Address: " + dev
[0].p2p_interface_addr() not in status
:
1189 raise Exception("Peer address not shown correctly")
1191 def test_ap_wps_wep_enroll(dev
, apdev
):
1192 """WPS 2.0 STA rejecting WEP configuration"""
1193 ssid
= "test-wps-wep"
1194 hostapd
.add_ap(apdev
[0]['ifname'],
1195 { "ssid": ssid
, "eap_server": "1", "wps_state": "2",
1196 "skip_cred_build": "1", "extra_cred": "wps-wep-cred" })
1197 hapd
= hostapd
.Hostapd(apdev
[0]['ifname'])
1198 hapd
.request("WPS_PBC")
1199 dev
[0].request("WPS_PBC")
1200 ev
= dev
[0].wait_event(["WPS-FAIL"], timeout
=15)
1202 raise Exception("WPS-FAIL event timed out")
1203 if "msg=12" not in ev
or "reason=2 (WEP Prohibited)" not in ev
:
1204 raise Exception("Unexpected WPS-FAIL event: " + ev
)
1206 def test_ap_wps_ie_fragmentation(dev
, apdev
):
1207 """WPS AP using fragmented WPS IE"""
1208 ssid
= "test-wps-ie-fragmentation"
1209 params
= { "ssid": ssid
, "eap_server": "1", "wps_state": "2",
1210 "wpa_passphrase": "12345678", "wpa": "2",
1211 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
1212 "device_name": "1234567890abcdef1234567890abcdef",
1213 "manufacturer": "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef",
1214 "model_name": "1234567890abcdef1234567890abcdef",
1215 "model_number": "1234567890abcdef1234567890abcdef",
1216 "serial_number": "1234567890abcdef1234567890abcdef" }
1217 hostapd
.add_ap(apdev
[0]['ifname'], params
)
1218 hapd
= hostapd
.Hostapd(apdev
[0]['ifname'])
1219 hapd
.request("WPS_PBC")
1220 dev
[0].request("WPS_PBC")
1221 ev
= dev
[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout
=30)
1223 raise Exception("Association with the AP timed out")
1224 bss
= dev
[0].get_bss(apdev
[0]['bssid'])
1225 if "wps_device_name" not in bss
or bss
['wps_device_name'] != "1234567890abcdef1234567890abcdef":
1227 raise Exception("Device Name not received correctly")
1228 if len(re
.findall("dd..0050f204", bss
['ie'])) != 2:
1229 raise Exception("Unexpected number of WPS IEs")
1231 def get_psk(pskfile
):
1233 with
open(pskfile
, "r") as f
:
1234 lines
= f
.read().splitlines()
1236 if l
== "# WPA PSKs":
1238 (addr
,psk
) = l
.split(' ')
1242 def test_ap_wps_per_station_psk(dev
, apdev
):
1243 """WPS PBC provisioning with per-station PSK"""
1244 addr0
= dev
[0].p2p_dev_addr()
1245 addr1
= dev
[1].p2p_dev_addr()
1246 addr2
= dev
[2].p2p_dev_addr()
1249 pskfile
= "/tmp/ap_wps_per_enrollee_psk.psk_file"
1256 with
open(pskfile
, "w") as f
:
1257 f
.write("# WPA PSKs\n")
1259 params
= { "ssid": ssid
, "eap_server": "1", "wps_state": "2",
1260 "wpa": "2", "wpa_key_mgmt": "WPA-PSK",
1261 "rsn_pairwise": "CCMP", "ap_pin": appin
,
1262 "wpa_psk_file": pskfile
}
1263 hapd
= hostapd
.add_ap(apdev
[0]['ifname'], params
)
1265 logger
.info("First enrollee")
1266 hapd
.request("WPS_PBC")
1267 dev
[0].request("WPS_PBC")
1268 ev
= dev
[0].wait_event(["CTRL-EVENT-CONNECTED"])
1270 raise Exception("Association with the AP timed out (1)")
1272 logger
.info("Second enrollee")
1273 hapd
.request("WPS_PBC")
1274 dev
[1].request("WPS_PBC")
1275 ev
= dev
[1].wait_event(["CTRL-EVENT-CONNECTED"])
1277 raise Exception("Association with the AP timed out (2)")
1279 logger
.info("External registrar")
1280 dev
[2].wps_reg(apdev
[0]['bssid'], appin
)
1282 logger
.info("Verifying PSK results")
1283 psks
= get_psk(pskfile
)
1284 if addr0
not in psks
:
1285 raise Exception("No PSK recorded for sta0")
1286 if addr1
not in psks
:
1287 raise Exception("No PSK recorded for sta1")
1288 if addr2
not in psks
:
1289 raise Exception("No PSK recorded for sta2")
1290 if psks
[addr0
] == psks
[addr1
]:
1291 raise Exception("Same PSK recorded for sta0 and sta1")
1292 if psks
[addr0
] == psks
[addr2
]:
1293 raise Exception("Same PSK recorded for sta0 and sta2")
1294 if psks
[addr1
] == psks
[addr2
]:
1295 raise Exception("Same PSK recorded for sta1 and sta2")
1297 dev
[0].request("REMOVE_NETWORK all")
1298 logger
.info("Second external registrar")
1299 dev
[0].wps_reg(apdev
[0]['bssid'], appin
)
1300 psks2
= get_psk(pskfile
)
1301 if addr0
not in psks2
:
1302 raise Exception("No PSK recorded for sta0(reg)")
1303 if psks
[addr0
] == psks2
[addr0
]:
1304 raise Exception("Same PSK recorded for sta0(enrollee) and sta0(reg)")
1308 def test_ap_wps_per_station_psk_failure(dev
, apdev
):
1309 """WPS PBC provisioning with per-station PSK (file not writable)"""
1310 addr0
= dev
[0].p2p_dev_addr()
1311 addr1
= dev
[1].p2p_dev_addr()
1312 addr2
= dev
[2].p2p_dev_addr()
1315 pskfile
= "/tmp/ap_wps_per_enrollee_psk.psk_file"
1322 with
open(pskfile
, "w") as f
:
1323 f
.write("# WPA PSKs\n")
1325 params
= { "ssid": ssid
, "eap_server": "1", "wps_state": "2",
1326 "wpa": "2", "wpa_key_mgmt": "WPA-PSK",
1327 "rsn_pairwise": "CCMP", "ap_pin": appin
,
1328 "wpa_psk_file": pskfile
}
1329 hapd
= hostapd
.add_ap(apdev
[0]['ifname'], params
)
1330 if "FAIL" in hapd
.request("SET wpa_psk_file /tmp/does/not/exists/ap_wps_per_enrollee_psk_failure.psk_file"):
1331 raise Exception("Failed to set wpa_psk_file")
1333 logger
.info("First enrollee")
1334 hapd
.request("WPS_PBC")
1335 dev
[0].request("WPS_PBC")
1336 ev
= dev
[0].wait_event(["CTRL-EVENT-CONNECTED"])
1338 raise Exception("Association with the AP timed out (1)")
1340 logger
.info("Second enrollee")
1341 hapd
.request("WPS_PBC")
1342 dev
[1].request("WPS_PBC")
1343 ev
= dev
[1].wait_event(["CTRL-EVENT-CONNECTED"])
1345 raise Exception("Association with the AP timed out (2)")
1347 logger
.info("External registrar")
1348 dev
[2].wps_reg(apdev
[0]['bssid'], appin
)
1350 logger
.info("Verifying PSK results")
1351 psks
= get_psk(pskfile
)
1353 raise Exception("PSK recorded unexpectedly")
1357 def test_ap_wps_pin_request_file(dev
, apdev
):
1358 """WPS PIN provisioning with configured AP"""
1360 pinfile
= "/tmp/ap_wps_pin_request_file.log"
1361 if os
.path
.exists(pinfile
):
1362 subprocess
.call(['sudo', 'rm', pinfile
])
1363 hostapd
.add_ap(apdev
[0]['ifname'],
1364 { "ssid": ssid
, "eap_server": "1", "wps_state": "2",
1365 "wps_pin_requests": pinfile
,
1366 "wpa_passphrase": "12345678", "wpa": "2",
1367 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
1368 hapd
= hostapd
.Hostapd(apdev
[0]['ifname'])
1369 uuid
= dev
[0].get_status_field("uuid")
1370 pin
= dev
[0].wps_read_pin()
1372 dev
[0].request("WPS_PIN any " + pin
)
1373 ev
= hapd
.wait_event(["WPS-PIN-NEEDED"], timeout
=15)
1375 raise Exception("PIN needed event not shown")
1377 raise Exception("UUID mismatch")
1378 dev
[0].request("WPS_CANCEL")
1380 with
open(pinfile
, "r") as f
:
1381 lines
= f
.readlines()
1387 raise Exception("PIN request entry not in the log file")
1389 subprocess
.call(['sudo', 'rm', pinfile
])
1391 def test_ap_wps_auto_setup_with_config_file(dev
, apdev
):
1392 """WPS auto-setup with configuration file"""
1393 conffile
= "/tmp/ap_wps_auto_setup_with_config_file.conf"
1394 ifname
= apdev
[0]['ifname']
1396 with
open(conffile
, "w") as f
:
1397 f
.write("driver=nl80211\n")
1398 f
.write("hw_mode=g\n")
1399 f
.write("channel=1\n")
1400 f
.write("ieee80211n=1\n")
1401 f
.write("interface=%s\n" % ifname
)
1402 f
.write("ctrl_interface=/var/run/hostapd\n")
1403 f
.write("ssid=wps\n")
1404 f
.write("eap_server=1\n")
1405 f
.write("wps_state=1\n")
1406 hostapd
.add_bss('phy3', ifname
, conffile
)
1407 hapd
= hostapd
.Hostapd(ifname
)
1408 hapd
.request("WPS_PBC")
1409 dev
[0].request("WPS_PBC")
1410 ev
= dev
[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout
=30)
1412 raise Exception("Association with the AP timed out")
1413 with
open(conffile
, "r") as f
:
1414 lines
= f
.read().splitlines()
1418 [name
,value
] = l
.split('=', 1)
1420 except ValueError, e
:
1421 if "# WPS configuration" in l
:
1424 raise Exception("Unexpected configuration line: " + l
)
1425 if vals
['ieee80211n'] != '1' or vals
['wps_state'] != '2' or "WPA-PSK" not in vals
['wpa_key_mgmt']:
1426 raise Exception("Incorrect configuration: " + str(vals
))
1428 subprocess
.call(['sudo', 'rm', conffile
])
1430 def test_ap_wps_pbc_timeout(dev
, apdev
, params
):
1431 """wpa_supplicant PBC walk time [long]"""
1432 if not params
['long']:
1433 logger
.info("Skip test case with long duration due to --long not specified")
1436 hostapd
.add_ap(apdev
[0]['ifname'],
1437 { "ssid": ssid
, "eap_server": "1", "wps_state": "1" })
1438 hapd
= hostapd
.Hostapd(apdev
[0]['ifname'])
1439 logger
.info("Start WPS_PBC and wait for PBC walk time expiration")
1440 if "OK" not in dev
[0].request("WPS_PBC"):
1441 raise Exception("WPS_PBC failed")
1442 ev
= dev
[0].wait_event(["WPS-TIMEOUT"], timeout
=150)
1444 raise Exception("WPS-TIMEOUT not reported")
1446 def add_ssdp_ap(ifname
, ap_uuid
):
1449 hostapd
.add_ap(ifname
,
1450 { "ssid": ssid
, "eap_server": "1", "wps_state": "2",
1451 "wpa_passphrase": "12345678", "wpa": "2",
1452 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
1453 "device_name": "Wireless AP", "manufacturer": "Company",
1454 "model_name": "WAP", "model_number": "123",
1455 "serial_number": "12345", "device_type": "6-0050F204-1",
1456 "os_version": "01020300",
1457 "config_methods": "label push_button",
1458 "ap_pin": ap_pin
, "uuid": ap_uuid
, "upnp_iface": "lo",
1459 "friendly_name": "WPS Access Point",
1460 "manufacturer_url": "http://www.example.com/",
1461 "model_description": "Wireless Access Point",
1462 "model_url": "http://www.example.com/model/",
1463 "upc": "123456789012" })
1465 def ssdp_send(msg
, no_recv
=False):
1466 socket
.setdefaulttimeout(1)
1467 sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_DGRAM
, socket
.IPPROTO_UDP
)
1468 sock
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
, 1)
1469 sock
.setsockopt(socket
.IPPROTO_IP
, socket
.IP_MULTICAST_TTL
, 2)
1470 sock
.bind(("127.0.0.1", 0))
1471 sock
.sendto(msg
, ("239.255.255.250", 1900))
1474 return sock
.recv(1000)
1476 def ssdp_send_msearch(st
):
1478 'M-SEARCH * HTTP/1.1',
1479 'HOST: 239.255.255.250:1900',
1481 'MAN: "ssdp:discover"',
1484 return ssdp_send(msg
)
1486 def test_ap_wps_ssdp_msearch(dev
, apdev
):
1487 """WPS AP and SSDP M-SEARCH messages"""
1488 ap_uuid
= "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
1489 add_ssdp_ap(apdev
[0]['ifname'], ap_uuid
)
1492 'M-SEARCH * HTTP/1.1',
1493 'Host: 239.255.255.250:1900',
1495 'Man: "ssdp:discover"',
1496 'St: urn:schemas-wifialliance-org:device:WFADevice:1',
1501 'M-SEARCH * HTTP/1.1',
1502 'host:\t239.255.255.250:1900\t\t\t\t \t\t',
1504 'man: \t \t "ssdp:discover" ',
1505 'st: urn:schemas-wifialliance-org:device:WFADevice:1\t\t',
1509 ssdp_send_msearch("ssdp:all")
1510 ssdp_send_msearch("upnp:rootdevice")
1511 ssdp_send_msearch("uuid:" + ap_uuid
)
1512 ssdp_send_msearch("urn:schemas-wifialliance-org:service:WFAWLANConfig:1")
1513 ssdp_send_msearch("urn:schemas-wifialliance-org:device:WFADevice:1");
1516 'M-SEARCH * HTTP/1.1',
1517 'HOST:\t239.255.255.250:1900',
1518 'MAN: "ssdp:discover"',
1520 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1522 ssdp_send(msg
, no_recv
=True)
1524 def test_ap_wps_ssdp_invalid_msearch(dev
, apdev
):
1525 """WPS AP and invalid SSDP M-SEARCH messages"""
1526 ap_uuid
= "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
1527 add_ssdp_ap(apdev
[0]['ifname'], ap_uuid
)
1529 socket
.setdefaulttimeout(1)
1530 sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_DGRAM
, socket
.IPPROTO_UDP
)
1531 sock
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
, 1)
1532 sock
.setsockopt(socket
.IPPROTO_IP
, socket
.IP_MULTICAST_TTL
, 2)
1533 sock
.bind(("127.0.0.1", 0))
1535 logger
.debug("Missing MX")
1537 'M-SEARCH * HTTP/1.1',
1538 'HOST: 239.255.255.250:1900',
1539 'MAN: "ssdp:discover"',
1540 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1542 sock
.sendto(msg
, ("239.255.255.250", 1900))
1544 logger
.debug("Negative MX")
1546 'M-SEARCH * HTTP/1.1',
1547 'HOST: 239.255.255.250:1900',
1549 'MAN: "ssdp:discover"',
1550 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1552 sock
.sendto(msg
, ("239.255.255.250", 1900))
1554 logger
.debug("Invalid MX")
1556 'M-SEARCH * HTTP/1.1',
1557 'HOST: 239.255.255.250:1900',
1559 'MAN: "ssdp:discover"',
1560 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1562 sock
.sendto(msg
, ("239.255.255.250", 1900))
1564 logger
.debug("Missing MAN")
1566 'M-SEARCH * HTTP/1.1',
1567 'HOST: 239.255.255.250:1900',
1569 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1571 sock
.sendto(msg
, ("239.255.255.250", 1900))
1573 logger
.debug("Invalid MAN")
1575 'M-SEARCH * HTTP/1.1',
1576 'HOST: 239.255.255.250:1900',
1579 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1581 sock
.sendto(msg
, ("239.255.255.250", 1900))
1583 'M-SEARCH * HTTP/1.1',
1584 'HOST: 239.255.255.250:1900',
1586 'MAN; "ssdp:discover"',
1587 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1589 sock
.sendto(msg
, ("239.255.255.250", 1900))
1591 logger
.debug("Missing HOST")
1593 'M-SEARCH * HTTP/1.1',
1594 'MAN: "ssdp:discover"',
1596 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1598 sock
.sendto(msg
, ("239.255.255.250", 1900))
1600 logger
.debug("Missing ST")
1602 'M-SEARCH * HTTP/1.1',
1603 'HOST: 239.255.255.250:1900',
1604 'MAN: "ssdp:discover"',
1607 sock
.sendto(msg
, ("239.255.255.250", 1900))
1609 logger
.debug("Mismatching ST")
1611 'M-SEARCH * HTTP/1.1',
1612 'HOST: 239.255.255.250:1900',
1613 'MAN: "ssdp:discover"',
1615 'ST: uuid:16d5f8a9-4ee4-4f5e-81f9-cc6e2f47f42d',
1617 sock
.sendto(msg
, ("239.255.255.250", 1900))
1619 'M-SEARCH * HTTP/1.1',
1620 'HOST: 239.255.255.250:1900',
1621 'MAN: "ssdp:discover"',
1625 sock
.sendto(msg
, ("239.255.255.250", 1900))
1627 'M-SEARCH * HTTP/1.1',
1628 'HOST: 239.255.255.250:1900',
1629 'MAN: "ssdp:discover"',
1633 sock
.sendto(msg
, ("239.255.255.250", 1900))
1635 logger
.debug("Invalid ST")
1637 'M-SEARCH * HTTP/1.1',
1638 'HOST: 239.255.255.250:1900',
1639 'MAN: "ssdp:discover"',
1641 'ST; urn:schemas-wifialliance-org:device:WFADevice:1',
1643 sock
.sendto(msg
, ("239.255.255.250", 1900))
1645 logger
.debug("Invalid M-SEARCH")
1647 'M+SEARCH * HTTP/1.1',
1648 'HOST: 239.255.255.250:1900',
1649 'MAN: "ssdp:discover"',
1651 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1653 sock
.sendto(msg
, ("239.255.255.250", 1900))
1655 'M-SEARCH-* HTTP/1.1',
1656 'HOST: 239.255.255.250:1900',
1657 'MAN: "ssdp:discover"',
1659 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1661 sock
.sendto(msg
, ("239.255.255.250", 1900))
1663 logger
.debug("Invalid message format")
1664 sock
.sendto("NOTIFY * HTTP/1.1", ("239.255.255.250", 1900))
1666 'M-SEARCH * HTTP/1.1',
1667 'HOST: 239.255.255.250:1900',
1668 'MAN: "ssdp:discover"',
1670 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1672 sock
.sendto(msg
, ("239.255.255.250", 1900))
1676 raise Exception("Unexpected M-SEARCH response: " + r
)
1677 except socket
.timeout
:
1680 logger
.debug("Valid M-SEARCH")
1682 'M-SEARCH * HTTP/1.1',
1683 'HOST: 239.255.255.250:1900',
1684 'MAN: "ssdp:discover"',
1686 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1688 sock
.sendto(msg
, ("239.255.255.250", 1900))
1693 except socket
.timeout
:
1694 raise Exception("No SSDP response")
1696 def test_ap_wps_ssdp_burst(dev
, apdev
):
1697 """WPS AP and SSDP burst"""
1698 ap_uuid
= "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
1699 add_ssdp_ap(apdev
[0]['ifname'], ap_uuid
)
1702 'M-SEARCH * HTTP/1.1',
1703 'HOST: 239.255.255.250:1900',
1704 'MAN: "ssdp:discover"',
1706 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1708 socket
.setdefaulttimeout(1)
1709 sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_DGRAM
, socket
.IPPROTO_UDP
)
1710 sock
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
, 1)
1711 sock
.setsockopt(socket
.IPPROTO_IP
, socket
.IP_MULTICAST_TTL
, 2)
1712 sock
.bind(("127.0.0.1", 0))
1713 for i
in range(0, 25):
1714 sock
.sendto(msg
, ("239.255.255.250", 1900))
1719 if not r
.startswith("HTTP/1.1 200 OK\r\n"):
1720 raise Exception("Unexpected message: " + r
)
1722 except socket
.timeout
:
1725 raise Exception("Too few SSDP responses")
1727 sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_DGRAM
, socket
.IPPROTO_UDP
)
1728 sock
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
, 1)
1729 sock
.setsockopt(socket
.IPPROTO_IP
, socket
.IP_MULTICAST_TTL
, 2)
1730 sock
.bind(("127.0.0.1", 0))
1731 for i
in range(0, 25):
1732 sock
.sendto(msg
, ("239.255.255.250", 1900))
1738 except socket
.timeout
:
1739 raise Exception("No SSDP response")
1741 def ssdp_get_location(uuid
):
1742 res
= ssdp_send_msearch("uuid:" + uuid
)
1744 for l
in res
.splitlines():
1745 if l
.lower().startswith("location:"):
1746 location
= l
.split(':', 1)[1].strip()
1748 if location
is None:
1749 raise Exception("No UPnP location found")
1752 def upnp_get_urls(location
):
1753 conn
= urllib
.urlopen(location
)
1754 tree
= ET
.parse(conn
)
1755 root
= tree
.getroot()
1756 urn
= '{urn:schemas-upnp-org:device-1-0}'
1757 service
= root
.find("./" + urn
+ "device/" + urn
+ "serviceList/" + urn
+ "service")
1759 res
['scpd_url'] = urlparse
.urljoin(location
, service
.find(urn
+ 'SCPDURL').text
)
1760 res
['control_url'] = urlparse
.urljoin(location
, service
.find(urn
+ 'controlURL').text
)
1761 res
['event_sub_url'] = urlparse
.urljoin(location
, service
.find(urn
+ 'eventSubURL').text
)
1764 def upnp_soap_action(conn
, path
, action
, include_soap_action
=True, soap_action_override
=None):
1765 soapns
= 'http://schemas.xmlsoap.org/soap/envelope/'
1766 wpsns
= 'urn:schemas-wifialliance-org:service:WFAWLANConfig:1'
1767 ET
.register_namespace('soapenv', soapns
)
1768 ET
.register_namespace('wfa', wpsns
)
1770 attrib
['{%s}encodingStyle' % soapns
] = 'http://schemas.xmlsoap.org/soap/encoding/'
1771 root
= ET
.Element("{%s}Envelope" % soapns
, attrib
=attrib
)
1772 body
= ET
.SubElement(root
, "{%s}Body" % soapns
)
1773 act
= ET
.SubElement(body
, "{%s}%s" % (wpsns
, action
))
1774 tree
= ET
.ElementTree(root
)
1775 soap
= StringIO
.StringIO()
1776 tree
.write(soap
, xml_declaration
=True, encoding
='utf-8')
1778 headers
= { "Content-type": 'text/xml; charset="utf-8"' }
1779 if include_soap_action
:
1780 headers
["SOAPAction"] = '"urn:schemas-wifialliance-org:service:WFAWLANConfig:1#%s"' % action
1781 elif soap_action_override
:
1782 headers
["SOAPAction"] = soap_action_override
1783 conn
.request("POST", path
, soap
.getvalue(), headers
)
1784 return conn
.getresponse()
1786 def test_ap_wps_upnp(dev
, apdev
):
1787 """WPS AP and UPnP operations"""
1788 ap_uuid
= "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
1789 add_ssdp_ap(apdev
[0]['ifname'], ap_uuid
)
1791 location
= ssdp_get_location(ap_uuid
)
1792 urls
= upnp_get_urls(location
)
1794 conn
= urllib
.urlopen(urls
['scpd_url'])
1797 conn
= urllib
.urlopen(urlparse
.urljoin(location
, "unknown.html"))
1798 if conn
.getcode() != 404:
1799 raise Exception("Unexpected HTTP response to GET unknown URL")
1801 url
= urlparse
.urlparse(location
)
1802 conn
= httplib
.HTTPConnection(url
.netloc
)
1803 #conn.set_debuglevel(1)
1804 headers
= { "Content-type": 'text/xml; charset="utf-8"',
1805 "SOAPAction": '"urn:schemas-wifialliance-org:service:WFAWLANConfig:1#GetDeviceInfo"' }
1806 conn
.request("POST", "hello", "\r\n\r\n", headers
)
1807 resp
= conn
.getresponse()
1808 if resp
.status
!= 404:
1809 raise Exception("Unexpected HTTP response: %s" % resp
.status
)
1811 conn
.request("UNKNOWN", "hello", "\r\n\r\n", headers
)
1812 resp
= conn
.getresponse()
1813 if resp
.status
!= 501:
1814 raise Exception("Unexpected HTTP response: %s" % resp
.status
)
1816 headers
= { "Content-type": 'text/xml; charset="utf-8"',
1817 "SOAPAction": '"urn:some-unknown-action#GetDeviceInfo"' }
1818 ctrlurl
= urlparse
.urlparse(urls
['control_url'])
1819 conn
.request("POST", ctrlurl
.path
, "\r\n\r\n", headers
)
1820 resp
= conn
.getresponse()
1821 if resp
.status
!= 401:
1822 raise Exception("Unexpected HTTP response: %s" % resp
.status
)
1824 logger
.debug("GetDeviceInfo without SOAPAction header")
1825 resp
= upnp_soap_action(conn
, ctrlurl
.path
, "GetDeviceInfo",
1826 include_soap_action
=False)
1827 if resp
.status
!= 401:
1828 raise Exception("Unexpected HTTP response: %s" % resp
.status
)
1830 logger
.debug("GetDeviceInfo with invalid SOAPAction header")
1832 "urn:schemas-wifialliance-org:service:WFAWLANConfig:1#GetDeviceInfo",
1833 '"urn:schemas-wifialliance-org:service:WFAWLANConfig:1"',
1834 '"urn:schemas-wifialliance-org:service:WFAWLANConfig:123#GetDevice']:
1835 resp
= upnp_soap_action(conn
, ctrlurl
.path
, "GetDeviceInfo",
1836 include_soap_action
=False,
1837 soap_action_override
=act
)
1838 if resp
.status
!= 401:
1839 raise Exception("Unexpected HTTP response: %s" % resp
.status
)
1841 resp
= upnp_soap_action(conn
, ctrlurl
.path
, "GetDeviceInfo")
1842 if resp
.status
!= 200:
1843 raise Exception("Unexpected HTTP response: %s" % resp
.status
)
1845 if "NewDeviceInfo" not in dev
:
1846 raise Exception("Unexpected GetDeviceInfo response")
1848 logger
.debug("PutMessage without required parameters")
1849 resp
= upnp_soap_action(conn
, ctrlurl
.path
, "PutMessage")
1850 if resp
.status
!= 600:
1851 raise Exception("Unexpected HTTP response: %s" % resp
.status
)
1853 logger
.debug("PutWLANResponse without required parameters")
1854 resp
= upnp_soap_action(conn
, ctrlurl
.path
, "PutWLANResponse")
1855 if resp
.status
!= 600:
1856 raise Exception("Unexpected HTTP response: %s" % resp
.status
)
1858 logger
.debug("SetSelectedRegistrar from unregistered ER")
1859 resp
= upnp_soap_action(conn
, ctrlurl
.path
, "SetSelectedRegistrar")
1860 if resp
.status
!= 501:
1861 raise Exception("Unexpected HTTP response: %s" % resp
.status
)
1863 logger
.debug("Unknown action")
1864 resp
= upnp_soap_action(conn
, ctrlurl
.path
, "Unknown")
1865 if resp
.status
!= 401:
1866 raise Exception("Unexpected HTTP response: %s" % resp
.status
)
1868 def test_ap_wps_upnp_subscribe(dev
, apdev
):
1869 """WPS AP and UPnP event subscription"""
1870 ap_uuid
= "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
1871 add_ssdp_ap(apdev
[0]['ifname'], ap_uuid
)
1873 location
= ssdp_get_location(ap_uuid
)
1874 urls
= upnp_get_urls(location
)
1875 eventurl
= urlparse
.urlparse(urls
['event_sub_url'])
1877 url
= urlparse
.urlparse(location
)
1878 conn
= httplib
.HTTPConnection(url
.netloc
)
1879 #conn.set_debuglevel(1)
1880 headers
= { "callback": '<http://127.0.0.1:12345/event>',
1881 "timeout": "Second-1234" }
1882 conn
.request("SUBSCRIBE", "hello", "\r\n\r\n", headers
)
1883 resp
= conn
.getresponse()
1884 if resp
.status
!= 412:
1885 raise Exception("Unexpected HTTP response: %s" % resp
.status
)
1887 conn
.request("SUBSCRIBE", eventurl
.path
, "\r\n\r\n", headers
)
1888 resp
= conn
.getresponse()
1889 if resp
.status
!= 412:
1890 raise Exception("Unexpected HTTP response: %s" % resp
.status
)
1892 headers
= { "NT": "upnp:event",
1893 "timeout": "Second-1234" }
1894 conn
.request("SUBSCRIBE", eventurl
.path
, "\r\n\r\n", headers
)
1895 resp
= conn
.getresponse()
1896 if resp
.status
!= 412:
1897 raise Exception("Unexpected HTTP response: %s" % resp
.status
)
1899 headers
= { "callback": '<http://127.0.0.1:12345/event>',
1900 "NT": "upnp:foobar",
1901 "timeout": "Second-1234" }
1902 conn
.request("SUBSCRIBE", eventurl
.path
, "\r\n\r\n", headers
)
1903 resp
= conn
.getresponse()
1904 if resp
.status
!= 400:
1905 raise Exception("Unexpected HTTP response: %s" % resp
.status
)
1907 logger
.debug("Valid subscription")
1908 headers
= { "callback": '<http://127.0.0.1:12345/event>',
1910 "timeout": "Second-1234" }
1911 conn
.request("SUBSCRIBE", eventurl
.path
, "\r\n\r\n", headers
)
1912 resp
= conn
.getresponse()
1913 if resp
.status
!= 200:
1914 raise Exception("Unexpected HTTP response: %s" % resp
.status
)
1915 sid
= resp
.getheader("sid")
1916 logger
.debug("Subscription SID " + sid
)
1918 logger
.debug("Invalid re-subscription")
1919 headers
= { "NT": "upnp:event",
1920 "sid": "123456734567854",
1921 "timeout": "Second-1234" }
1922 conn
.request("SUBSCRIBE", eventurl
.path
, "\r\n\r\n", headers
)
1923 resp
= conn
.getresponse()
1924 if resp
.status
!= 400:
1925 raise Exception("Unexpected HTTP response: %s" % resp
.status
)
1927 logger
.debug("Invalid re-subscription")
1928 headers
= { "NT": "upnp:event",
1929 "sid": "uuid:123456734567854",
1930 "timeout": "Second-1234" }
1931 conn
.request("SUBSCRIBE", eventurl
.path
, "\r\n\r\n", headers
)
1932 resp
= conn
.getresponse()
1933 if resp
.status
!= 400:
1934 raise Exception("Unexpected HTTP response: %s" % resp
.status
)
1936 logger
.debug("Invalid re-subscription")
1937 headers
= { "callback": '<http://127.0.0.1:12345/event>',
1940 "timeout": "Second-1234" }
1941 conn
.request("SUBSCRIBE", eventurl
.path
, "\r\n\r\n", headers
)
1942 resp
= conn
.getresponse()
1943 if resp
.status
!= 400:
1944 raise Exception("Unexpected HTTP response: %s" % resp
.status
)
1946 logger
.debug("SID mismatch in re-subscription")
1947 headers
= { "NT": "upnp:event",
1948 "sid": "uuid:4c2bca79-1ff4-4e43-85d4-952a2b8a51fb",
1949 "timeout": "Second-1234" }
1950 conn
.request("SUBSCRIBE", eventurl
.path
, "\r\n\r\n", headers
)
1951 resp
= conn
.getresponse()
1952 if resp
.status
!= 412:
1953 raise Exception("Unexpected HTTP response: %s" % resp
.status
)
1955 logger
.debug("Valid re-subscription")
1956 headers
= { "NT": "upnp:event",
1958 "timeout": "Second-1234" }
1959 conn
.request("SUBSCRIBE", eventurl
.path
, "\r\n\r\n", headers
)
1960 resp
= conn
.getresponse()
1961 if resp
.status
!= 200:
1962 raise Exception("Unexpected HTTP response: %s" % resp
.status
)
1963 sid2
= resp
.getheader("sid")
1964 logger
.debug("Subscription SID " + sid2
)
1967 raise Exception("Unexpected SID change")
1969 logger
.debug("Valid re-subscription")
1970 headers
= { "NT": "upnp:event",
1971 "sid": "uuid: \t \t" + sid
.split(':')[1],
1972 "timeout": "Second-1234" }
1973 conn
.request("SUBSCRIBE", eventurl
.path
, "\r\n\r\n", headers
)
1974 resp
= conn
.getresponse()
1975 if resp
.status
!= 200:
1976 raise Exception("Unexpected HTTP response: %s" % resp
.status
)
1978 logger
.debug("Invalid unsubscription")
1979 headers
= { "sid": sid
}
1980 conn
.request("UNSUBSCRIBE", "/hello", "\r\n\r\n", headers
)
1981 resp
= conn
.getresponse()
1982 if resp
.status
!= 412:
1983 raise Exception("Unexpected HTTP response: %s" % resp
.status
)
1984 headers
= { "foo": "bar" }
1985 conn
.request("UNSUBSCRIBE", eventurl
.path
, "\r\n\r\n", headers
)
1986 resp
= conn
.getresponse()
1987 if resp
.status
!= 412:
1988 raise Exception("Unexpected HTTP response: %s" % resp
.status
)
1990 logger
.debug("Valid unsubscription")
1991 headers
= { "sid": sid
}
1992 conn
.request("UNSUBSCRIBE", eventurl
.path
, "\r\n\r\n", headers
)
1993 resp
= conn
.getresponse()
1994 if resp
.status
!= 200:
1995 raise Exception("Unexpected HTTP response: %s" % resp
.status
)
1997 logger
.debug("Unsubscription for not existing SID")
1998 headers
= { "sid": sid
}
1999 conn
.request("UNSUBSCRIBE", eventurl
.path
, "\r\n\r\n", headers
)
2000 resp
= conn
.getresponse()
2001 if resp
.status
!= 412:
2002 raise Exception("Unexpected HTTP response: %s" % resp
.status
)
2004 logger
.debug("Invalid unsubscription")
2005 headers
= { "sid": " \t \tfoo" }
2006 conn
.request("UNSUBSCRIBE", eventurl
.path
, "\r\n\r\n", headers
)
2007 resp
= conn
.getresponse()
2008 if resp
.status
!= 400:
2009 raise Exception("Unexpected HTTP response: %s" % resp
.status
)
2011 logger
.debug("Invalid unsubscription")
2012 headers
= { "sid": "uuid:\t \tfoo" }
2013 conn
.request("UNSUBSCRIBE", eventurl
.path
, "\r\n\r\n", headers
)
2014 resp
= conn
.getresponse()
2015 if resp
.status
!= 400:
2016 raise Exception("Unexpected HTTP response: %s" % resp
.status
)
2018 logger
.debug("Invalid unsubscription")
2019 headers
= { "NT": "upnp:event",
2021 conn
.request("UNSUBSCRIBE", eventurl
.path
, "\r\n\r\n", headers
)
2022 resp
= conn
.getresponse()
2023 if resp
.status
!= 400:
2024 raise Exception("Unexpected HTTP response: %s" % resp
.status
)
2025 headers
= { "callback": '<http://127.0.0.1:12345/event>',
2027 conn
.request("UNSUBSCRIBE", eventurl
.path
, "\r\n\r\n", headers
)
2028 resp
= conn
.getresponse()
2029 if resp
.status
!= 400:
2030 raise Exception("Unexpected HTTP response: %s" % resp
.status
)
2032 logger
.debug("Valid subscription with multiple callbacks")
2033 headers
= { "callback": '<http://127.0.0.1:12345/event> <http://127.0.0.1:12345/event>\t<http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event>',
2035 "timeout": "Second-1234" }
2036 conn
.request("SUBSCRIBE", eventurl
.path
, "\r\n\r\n", headers
)
2037 resp
= conn
.getresponse()
2038 if resp
.status
!= 200:
2039 raise Exception("Unexpected HTTP response: %s" % resp
.status
)
2040 sid
= resp
.getheader("sid")
2041 logger
.debug("Subscription SID " + sid
)