]> git.ipfire.org Git - thirdparty/hostap.git/blob - tests/hwsim/test_ap_wps.py
tests: Additional WPS ctrl_iface coverage
[thirdparty/hostap.git] / tests / hwsim / test_ap_wps.py
1 # WPS tests
2 # Copyright (c) 2013-2014, Jouni Malinen <j@w1.fi>
3 #
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
6
7 import os
8 import time
9 import subprocess
10 import logging
11 logger = logging.getLogger()
12 import re
13 import socket
14 import httplib
15 import urlparse
16 import urllib
17 import xml.etree.ElementTree as ET
18 import StringIO
19
20 import hwsim_utils
21 import hostapd
22
23 def test_ap_wps_init(dev, apdev):
24 """Initial AP configuration with first WPS Enrollee"""
25 ssid = "test-wps"
26 hostapd.add_ap(apdev[0]['ifname'],
27 { "ssid": ssid, "eap_server": "1", "wps_state": "1" })
28 hapd = hostapd.Hostapd(apdev[0]['ifname'])
29 logger.info("WPS provisioning step")
30 hapd.request("WPS_PBC")
31 if "PBC Status: Active" not in hapd.request("WPS_GET_STATUS"):
32 raise Exception("PBC status not shown correctly")
33
34 id = dev[0].add_network()
35 dev[0].set_network_quoted(id, "ssid", "home")
36 dev[0].set_network_quoted(id, "psk", "12345678")
37 dev[0].request("ENABLE_NETWORK %s no-connect" % id)
38
39 id = dev[0].add_network()
40 dev[0].set_network_quoted(id, "ssid", "home2")
41 dev[0].set_network(id, "bssid", "00:11:22:33:44:55")
42 dev[0].set_network(id, "key_mgmt", "NONE")
43 dev[0].request("ENABLE_NETWORK %s no-connect" % id)
44
45 dev[0].request("WPS_PBC")
46 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
47 if ev is None:
48 raise Exception("Association with the AP timed out")
49 status = dev[0].get_status()
50 if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
51 raise Exception("Not fully connected")
52 if status['ssid'] != ssid:
53 raise Exception("Unexpected SSID")
54 if status['pairwise_cipher'] != 'CCMP':
55 raise Exception("Unexpected encryption configuration")
56 if status['key_mgmt'] != 'WPA2-PSK':
57 raise Exception("Unexpected key_mgmt")
58
59 status = hapd.request("WPS_GET_STATUS")
60 if "PBC Status: Disabled" not in status:
61 raise Exception("PBC status not shown correctly")
62 if "Last WPS result: Success" not in status:
63 raise Exception("Last WPS result not shown correctly")
64 if "Peer Address: " + dev[0].p2p_interface_addr() not in status:
65 raise Exception("Peer address not shown correctly")
66 conf = hapd.request("GET_CONFIG")
67 if "wps_state=configured" not in conf:
68 raise Exception("AP not in WPS configured state")
69 if "rsn_pairwise_cipher=CCMP TKIP" not in conf:
70 raise Exception("Unexpected rsn_pairwise_cipher")
71 if "wpa_pairwise_cipher=CCMP TKIP" not in conf:
72 raise Exception("Unexpected wpa_pairwise_cipher")
73 if "group_cipher=TKIP" not in conf:
74 raise Exception("Unexpected group_cipher")
75
76 if len(dev[0].list_networks()) != 3:
77 raise Exception("Unexpected number of network blocks")
78
79 def test_ap_wps_init_2ap_pbc(dev, apdev):
80 """Initial two-radio AP configuration with first WPS PBC Enrollee"""
81 ssid = "test-wps"
82 params = { "ssid": ssid, "eap_server": "1", "wps_state": "1" }
83 hostapd.add_ap(apdev[0]['ifname'], params)
84 hostapd.add_ap(apdev[1]['ifname'], params)
85 hapd = hostapd.Hostapd(apdev[0]['ifname'])
86 logger.info("WPS provisioning step")
87 hapd.request("WPS_PBC")
88 dev[0].scan(freq="2412")
89 bss = dev[0].get_bss(apdev[0]['bssid'])
90 if "[WPS-PBC]" not in bss['flags']:
91 raise Exception("WPS-PBC flag missing from AP1")
92 bss = dev[0].get_bss(apdev[1]['bssid'])
93 if "[WPS-PBC]" not in bss['flags']:
94 raise Exception("WPS-PBC flag missing from AP2")
95 dev[0].dump_monitor()
96 dev[0].request("SET wps_cred_processing 2")
97 dev[0].request("WPS_PBC")
98 ev = dev[0].wait_event(["WPS-CRED-RECEIVED"], timeout=30)
99 dev[0].request("SET wps_cred_processing 0")
100 if ev is None:
101 raise Exception("WPS cred event not seen")
102 if "100e" not in ev:
103 raise Exception("WPS attributes not included in the cred event")
104 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
105 if ev is None:
106 raise Exception("Association with the AP timed out")
107
108 dev[1].scan(freq="2412")
109 bss = dev[1].get_bss(apdev[0]['bssid'])
110 if "[WPS-PBC]" in bss['flags']:
111 raise Exception("WPS-PBC flag not cleared from AP1")
112 bss = dev[1].get_bss(apdev[1]['bssid'])
113 if "[WPS-PBC]" in bss['flags']:
114 raise Exception("WPS-PBC flag bit ckeared from AP2")
115
116 def test_ap_wps_init_2ap_pin(dev, apdev):
117 """Initial two-radio AP configuration with first WPS PIN Enrollee"""
118 ssid = "test-wps"
119 params = { "ssid": ssid, "eap_server": "1", "wps_state": "1" }
120 hostapd.add_ap(apdev[0]['ifname'], params)
121 hostapd.add_ap(apdev[1]['ifname'], params)
122 hapd = hostapd.Hostapd(apdev[0]['ifname'])
123 logger.info("WPS provisioning step")
124 pin = dev[0].wps_read_pin()
125 hapd.request("WPS_PIN any " + pin)
126 dev[0].scan(freq="2412")
127 bss = dev[0].get_bss(apdev[0]['bssid'])
128 if "[WPS-AUTH]" not in bss['flags']:
129 raise Exception("WPS-AUTH flag missing from AP1")
130 bss = dev[0].get_bss(apdev[1]['bssid'])
131 if "[WPS-AUTH]" not in bss['flags']:
132 raise Exception("WPS-AUTH flag missing from AP2")
133 dev[0].dump_monitor()
134 dev[0].request("WPS_PIN any " + pin)
135 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
136 if ev is None:
137 raise Exception("Association with the AP timed out")
138
139 dev[1].scan(freq="2412")
140 bss = dev[1].get_bss(apdev[0]['bssid'])
141 if "[WPS-AUTH]" in bss['flags']:
142 raise Exception("WPS-AUTH flag not cleared from AP1")
143 bss = dev[1].get_bss(apdev[1]['bssid'])
144 if "[WPS-AUTH]" in bss['flags']:
145 raise Exception("WPS-AUTH flag bit ckeared from AP2")
146
147 def test_ap_wps_init_through_wps_config(dev, apdev):
148 """Initial AP configuration using wps_config command"""
149 ssid = "test-wps-init-config"
150 hostapd.add_ap(apdev[0]['ifname'],
151 { "ssid": ssid, "eap_server": "1", "wps_state": "1" })
152 hapd = hostapd.Hostapd(apdev[0]['ifname'])
153 if "FAIL" in hapd.request("WPS_CONFIG " + ssid.encode("hex") + " WPA2PSK CCMP " + "12345678".encode("hex")):
154 raise Exception("WPS_CONFIG command failed")
155 ev = hapd.wait_event(["WPS-NEW-AP-SETTINGS"], timeout=5)
156 if ev is None:
157 raise Exception("Timeout on WPS-NEW-AP-SETTINGS events")
158 # It takes some time for the AP to update Beacon and Probe Response frames,
159 # so wait here before requesting the scan to be started to avoid adding
160 # extra five second wait to the test due to fetching obsolete scan results.
161 hapd.ping()
162 time.sleep(0.2)
163 dev[0].connect(ssid, psk="12345678", scan_freq="2412", proto="WPA2",
164 pairwise="CCMP", group="CCMP")
165
166 def test_ap_wps_conf(dev, apdev):
167 """WPS PBC provisioning with configured AP"""
168 ssid = "test-wps-conf"
169 hostapd.add_ap(apdev[0]['ifname'],
170 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
171 "wpa_passphrase": "12345678", "wpa": "2",
172 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
173 hapd = hostapd.Hostapd(apdev[0]['ifname'])
174 logger.info("WPS provisioning step")
175 hapd.request("WPS_PBC")
176 dev[0].dump_monitor()
177 dev[0].request("WPS_PBC")
178 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
179 if ev is None:
180 raise Exception("Association with the AP timed out")
181 status = dev[0].get_status()
182 if status['wpa_state'] != 'COMPLETED':
183 raise Exception("Not fully connected")
184 if status['bssid'] != apdev[0]['bssid']:
185 raise Exception("Unexpected BSSID")
186 if status['ssid'] != ssid:
187 raise Exception("Unexpected SSID")
188 if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'CCMP':
189 raise Exception("Unexpected encryption configuration")
190 if status['key_mgmt'] != 'WPA2-PSK':
191 raise Exception("Unexpected key_mgmt")
192
193 sta = hapd.get_sta(dev[0].p2p_interface_addr())
194 if 'wpsDeviceName' not in sta or sta['wpsDeviceName'] != "Device A":
195 raise Exception("Device name not available in STA command")
196
197 def test_ap_wps_conf_5ghz(dev, apdev):
198 """WPS PBC provisioning with configured AP on 5 GHz band"""
199 try:
200 ssid = "test-wps-conf"
201 params = { "ssid": ssid, "eap_server": "1", "wps_state": "2",
202 "wpa_passphrase": "12345678", "wpa": "2",
203 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
204 "country_code": "FI", "hw_mode": "a", "channel": "36" }
205 hapd = hostapd.add_ap(apdev[0]['ifname'], params)
206 logger.info("WPS provisioning step")
207 hapd.request("WPS_PBC")
208 dev[0].request("WPS_PBC")
209 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
210 if ev is None:
211 raise Exception("Association with the AP timed out")
212
213 sta = hapd.get_sta(dev[0].p2p_interface_addr())
214 if 'wpsDeviceName' not in sta or sta['wpsDeviceName'] != "Device A":
215 raise Exception("Device name not available in STA command")
216 finally:
217 subprocess.call(['sudo', 'iw', 'reg', 'set', '00'])
218
219 def test_ap_wps_conf_chan14(dev, apdev):
220 """WPS PBC provisioning with configured AP on channel 14"""
221 try:
222 ssid = "test-wps-conf"
223 params = { "ssid": ssid, "eap_server": "1", "wps_state": "2",
224 "wpa_passphrase": "12345678", "wpa": "2",
225 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
226 "country_code": "JP", "hw_mode": "b", "channel": "14" }
227 hapd = hostapd.add_ap(apdev[0]['ifname'], params)
228 logger.info("WPS provisioning step")
229 hapd.request("WPS_PBC")
230 dev[0].request("WPS_PBC")
231 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
232 if ev is None:
233 raise Exception("Association with the AP timed out")
234
235 sta = hapd.get_sta(dev[0].p2p_interface_addr())
236 if 'wpsDeviceName' not in sta or sta['wpsDeviceName'] != "Device A":
237 raise Exception("Device name not available in STA command")
238 finally:
239 subprocess.call(['sudo', 'iw', 'reg', 'set', '00'])
240
241 def test_ap_wps_twice(dev, apdev):
242 """WPS provisioning with twice to change passphrase"""
243 ssid = "test-wps-twice"
244 params = { "ssid": ssid, "eap_server": "1", "wps_state": "2",
245 "wpa_passphrase": "12345678", "wpa": "2",
246 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP" }
247 hostapd.add_ap(apdev[0]['ifname'], params)
248 hapd = hostapd.Hostapd(apdev[0]['ifname'])
249 logger.info("WPS provisioning step")
250 hapd.request("WPS_PBC")
251 dev[0].dump_monitor()
252 dev[0].request("WPS_PBC")
253 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
254 if ev is None:
255 raise Exception("Association with the AP timed out")
256 dev[0].request("DISCONNECT")
257
258 logger.info("Restart AP with different passphrase and re-run WPS")
259 hapd_global = hostapd.HostapdGlobal()
260 hapd_global.remove(apdev[0]['ifname'])
261 params['wpa_passphrase'] = 'another passphrase'
262 hostapd.add_ap(apdev[0]['ifname'], params)
263 hapd = hostapd.Hostapd(apdev[0]['ifname'])
264 logger.info("WPS provisioning step")
265 hapd.request("WPS_PBC")
266 dev[0].dump_monitor()
267 dev[0].request("WPS_PBC any")
268 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
269 if ev is None:
270 raise Exception("Association with the AP timed out")
271 networks = dev[0].list_networks()
272 if len(networks) > 1:
273 raise Exception("Unexpected duplicated network block present")
274
275 def test_ap_wps_incorrect_pin(dev, apdev):
276 """WPS PIN provisioning with incorrect PIN"""
277 ssid = "test-wps-incorrect-pin"
278 hostapd.add_ap(apdev[0]['ifname'],
279 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
280 "wpa_passphrase": "12345678", "wpa": "2",
281 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
282 hapd = hostapd.Hostapd(apdev[0]['ifname'])
283
284 logger.info("WPS provisioning attempt 1")
285 hapd.request("WPS_PIN any 12345670")
286 dev[0].dump_monitor()
287 dev[0].request("WPS_PIN any 55554444")
288 ev = dev[0].wait_event(["WPS-FAIL"], timeout=30)
289 if ev is None:
290 raise Exception("WPS operation timed out")
291 if "config_error=18" not in ev:
292 raise Exception("Incorrect config_error reported")
293 if "msg=8" not in ev:
294 raise Exception("PIN error detected on incorrect message")
295 ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"])
296 if ev is None:
297 raise Exception("Timeout on disconnection event")
298 dev[0].request("WPS_CANCEL")
299 # if a scan was in progress, wait for it to complete before trying WPS again
300 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 5)
301
302 status = hapd.request("WPS_GET_STATUS")
303 if "Last WPS result: Failed" not in status:
304 raise Exception("WPS failure result not shown correctly")
305
306 logger.info("WPS provisioning attempt 2")
307 hapd.request("WPS_PIN any 12345670")
308 dev[0].dump_monitor()
309 dev[0].request("WPS_PIN any 12344444")
310 ev = dev[0].wait_event(["WPS-FAIL"], timeout=30)
311 if ev is None:
312 raise Exception("WPS operation timed out")
313 if "config_error=18" not in ev:
314 raise Exception("Incorrect config_error reported")
315 if "msg=10" not in ev:
316 raise Exception("PIN error detected on incorrect message")
317 ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"])
318 if ev is None:
319 raise Exception("Timeout on disconnection event")
320
321 def test_ap_wps_conf_pin(dev, apdev):
322 """WPS PIN provisioning with configured AP"""
323 ssid = "test-wps-conf-pin"
324 hostapd.add_ap(apdev[0]['ifname'],
325 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
326 "wpa_passphrase": "12345678", "wpa": "2",
327 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
328 hapd = hostapd.Hostapd(apdev[0]['ifname'])
329 logger.info("WPS provisioning step")
330 pin = dev[0].wps_read_pin()
331 hapd.request("WPS_PIN any " + pin)
332 dev[0].dump_monitor()
333 dev[0].request("WPS_PIN any " + pin)
334 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
335 if ev is None:
336 raise Exception("Association with the AP timed out")
337 status = dev[0].get_status()
338 if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
339 raise Exception("Not fully connected")
340 if status['ssid'] != ssid:
341 raise Exception("Unexpected SSID")
342 if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'CCMP':
343 raise Exception("Unexpected encryption configuration")
344 if status['key_mgmt'] != 'WPA2-PSK':
345 raise Exception("Unexpected key_mgmt")
346
347 dev[1].scan(freq="2412")
348 bss = dev[1].get_bss(apdev[0]['bssid'])
349 if "[WPS-AUTH]" in bss['flags']:
350 raise Exception("WPS-AUTH flag not cleared")
351 logger.info("Try to connect from another station using the same PIN")
352 pin = dev[1].request("WPS_PIN any")
353 ev = dev[1].wait_event(["WPS-M2D","CTRL-EVENT-CONNECTED"], timeout=30)
354 if ev is None:
355 raise Exception("Operation timed out")
356 if "WPS-M2D" not in ev:
357 raise Exception("Unexpected WPS operation started")
358 hapd.request("WPS_PIN any " + pin)
359 ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
360 if ev is None:
361 raise Exception("Association with the AP timed out")
362
363 def test_ap_wps_conf_pin_2sta(dev, apdev):
364 """Two stations trying to use WPS PIN at the same time"""
365 ssid = "test-wps-conf-pin2"
366 hostapd.add_ap(apdev[0]['ifname'],
367 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
368 "wpa_passphrase": "12345678", "wpa": "2",
369 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
370 hapd = hostapd.Hostapd(apdev[0]['ifname'])
371 logger.info("WPS provisioning step")
372 pin = "12345670"
373 pin2 = "55554444"
374 hapd.request("WPS_PIN " + dev[0].get_status_field("uuid") + " " + pin)
375 hapd.request("WPS_PIN " + dev[1].get_status_field("uuid") + " " + pin)
376 dev[0].dump_monitor()
377 dev[1].dump_monitor()
378 dev[0].request("WPS_PIN any " + pin)
379 dev[1].request("WPS_PIN any " + pin)
380 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
381 if ev is None:
382 raise Exception("Association with the AP timed out")
383 ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
384 if ev is None:
385 raise Exception("Association with the AP timed out")
386
387 def test_ap_wps_conf_pin_timeout(dev, apdev):
388 """WPS PIN provisioning with configured AP timing out PIN"""
389 ssid = "test-wps-conf-pin"
390 hostapd.add_ap(apdev[0]['ifname'],
391 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
392 "wpa_passphrase": "12345678", "wpa": "2",
393 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
394 hapd = hostapd.Hostapd(apdev[0]['ifname'])
395 addr = dev[0].p2p_interface_addr()
396 pin = dev[0].wps_read_pin()
397 if "FAIL" not in hapd.request("WPS_PIN "):
398 raise Exception("Unexpected success on invalid WPS_PIN")
399 hapd.request("WPS_PIN any " + pin + " 1")
400 time.sleep(1.1)
401 dev[0].request("WPS_PIN any " + pin)
402 ev = hapd.wait_event(["WPS-PIN-NEEDED"], timeout=20)
403 if ev is None:
404 raise Exception("WPS-PIN-NEEDED event timed out")
405 ev = dev[0].wait_event(["WPS-M2D"])
406 if ev is None:
407 raise Exception("M2D not reported")
408 dev[0].request("WPS_CANCEL")
409
410 hapd.request("WPS_PIN any " + pin + " 20 " + addr)
411 dev[0].request("WPS_PIN any " + pin)
412 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
413 if ev is None:
414 raise Exception("Association with the AP timed out")
415
416 def test_ap_wps_reg_connect(dev, apdev):
417 """WPS registrar using AP PIN to connect"""
418 ssid = "test-wps-reg-ap-pin"
419 appin = "12345670"
420 hostapd.add_ap(apdev[0]['ifname'],
421 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
422 "wpa_passphrase": "12345678", "wpa": "2",
423 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
424 "ap_pin": appin})
425 logger.info("WPS provisioning step")
426 dev[0].dump_monitor()
427 dev[0].wps_reg(apdev[0]['bssid'], appin)
428 status = dev[0].get_status()
429 if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
430 raise Exception("Not fully connected")
431 if status['ssid'] != ssid:
432 raise Exception("Unexpected SSID")
433 if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'CCMP':
434 raise Exception("Unexpected encryption configuration")
435 if status['key_mgmt'] != 'WPA2-PSK':
436 raise Exception("Unexpected key_mgmt")
437
438 def check_wps_reg_failure(dev, ap, appin):
439 dev.request("WPS_REG " + ap['bssid'] + " " + appin)
440 ev = dev.wait_event(["WPS-SUCCESS", "WPS-FAIL"], timeout=15)
441 if ev is None:
442 raise Exception("WPS operation timed out")
443 if "WPS-SUCCESS" in ev:
444 raise Exception("WPS operation succeeded unexpectedly")
445 if "config_error=15" not in ev:
446 raise Exception("WPS setup locked state was not reported correctly")
447
448 def test_ap_wps_random_ap_pin(dev, apdev):
449 """WPS registrar using random AP PIN"""
450 ssid = "test-wps-reg-random-ap-pin"
451 ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
452 hostapd.add_ap(apdev[0]['ifname'],
453 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
454 "wpa_passphrase": "12345678", "wpa": "2",
455 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
456 "device_name": "Wireless AP", "manufacturer": "Company",
457 "model_name": "WAP", "model_number": "123",
458 "serial_number": "12345", "device_type": "6-0050F204-1",
459 "os_version": "01020300",
460 "config_methods": "label push_button",
461 "uuid": ap_uuid, "upnp_iface": "lo" })
462 hapd = hostapd.Hostapd(apdev[0]['ifname'])
463 appin = hapd.request("WPS_AP_PIN random")
464 if "FAIL" in appin:
465 raise Exception("Could not generate random AP PIN")
466 if appin not in hapd.request("WPS_AP_PIN get"):
467 raise Exception("Could not fetch current AP PIN")
468 logger.info("WPS provisioning step")
469 dev[0].wps_reg(apdev[0]['bssid'], appin)
470
471 hapd.request("WPS_AP_PIN disable")
472 logger.info("WPS provisioning step with AP PIN disabled")
473 check_wps_reg_failure(dev[1], apdev[0], appin)
474
475 logger.info("WPS provisioning step with AP PIN reset")
476 appin = "12345670"
477 hapd.request("WPS_AP_PIN set " + appin)
478 dev[1].wps_reg(apdev[0]['bssid'], appin)
479 dev[0].request("REMOVE_NETWORK all")
480 dev[1].request("REMOVE_NETWORK all")
481 dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"])
482 dev[1].wait_event(["CTRL-EVENT-DISCONNECTED"])
483
484 logger.info("WPS provisioning step after AP PIN timeout")
485 hapd.request("WPS_AP_PIN disable")
486 appin = hapd.request("WPS_AP_PIN random 1")
487 time.sleep(1.1)
488 if "FAIL" not in hapd.request("WPS_AP_PIN get"):
489 raise Exception("AP PIN unexpectedly still enabled")
490 check_wps_reg_failure(dev[0], apdev[0], appin)
491
492 logger.info("WPS provisioning step after AP PIN timeout(2)")
493 hapd.request("WPS_AP_PIN disable")
494 appin = "12345670"
495 hapd.request("WPS_AP_PIN set " + appin + " 1")
496 time.sleep(1.1)
497 if "FAIL" not in hapd.request("WPS_AP_PIN get"):
498 raise Exception("AP PIN unexpectedly still enabled")
499 check_wps_reg_failure(dev[1], apdev[0], appin)
500
501 def test_ap_wps_reg_config(dev, apdev):
502 """WPS registrar configuring an AP using AP PIN"""
503 ssid = "test-wps-init-ap-pin"
504 appin = "12345670"
505 hostapd.add_ap(apdev[0]['ifname'],
506 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
507 "ap_pin": appin})
508 logger.info("WPS configuration step")
509 dev[0].dump_monitor()
510 new_ssid = "wps-new-ssid"
511 new_passphrase = "1234567890"
512 dev[0].wps_reg(apdev[0]['bssid'], appin, new_ssid, "WPA2PSK", "CCMP",
513 new_passphrase)
514 status = dev[0].get_status()
515 if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
516 raise Exception("Not fully connected")
517 if status['ssid'] != new_ssid:
518 raise Exception("Unexpected SSID")
519 if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'CCMP':
520 raise Exception("Unexpected encryption configuration")
521 if status['key_mgmt'] != 'WPA2-PSK':
522 raise Exception("Unexpected key_mgmt")
523
524 logger.info("Re-configure back to open")
525 dev[0].request("REMOVE_NETWORK all")
526 dev[0].request("BSS_FLUSH 0")
527 dev[0].request("SCAN freq=2412 only_new=1")
528 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
529 if ev is None:
530 raise Exception("Scan timed out")
531 dev[0].dump_monitor()
532 dev[0].wps_reg(apdev[0]['bssid'], appin, "wps-open", "OPEN", "NONE", "")
533 status = dev[0].get_status()
534 if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
535 raise Exception("Not fully connected")
536 if status['ssid'] != "wps-open":
537 raise Exception("Unexpected SSID")
538 if status['key_mgmt'] != 'NONE':
539 raise Exception("Unexpected key_mgmt")
540
541 def test_ap_wps_reg_config_ext_processing(dev, apdev):
542 """WPS registrar configuring an AP with external config processing"""
543 ssid = "test-wps-init-ap-pin"
544 appin = "12345670"
545 params = { "ssid": ssid, "eap_server": "1", "wps_state": "2",
546 "wps_cred_processing": "1", "ap_pin": appin}
547 hapd = hostapd.add_ap(apdev[0]['ifname'], params)
548 new_ssid = "wps-new-ssid"
549 new_passphrase = "1234567890"
550 dev[0].wps_reg(apdev[0]['bssid'], appin, new_ssid, "WPA2PSK", "CCMP",
551 new_passphrase, no_wait=True)
552 ev = dev[0].wait_event(["WPS-SUCCESS"], timeout=15)
553 if ev is None:
554 raise Exception("WPS registrar operation timed out")
555 ev = hapd.wait_event(["WPS-NEW-AP-SETTINGS"], timeout=15)
556 if ev is None:
557 raise Exception("WPS configuration timed out")
558 if "1026" not in ev:
559 raise Exception("AP Settings missing from event")
560 hapd.request("SET wps_cred_processing 0")
561 if "FAIL" in hapd.request("WPS_CONFIG " + new_ssid.encode("hex") + " WPA2PSK CCMP " + new_passphrase.encode("hex")):
562 raise Exception("WPS_CONFIG command failed")
563 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=15)
564 if ev is None:
565 raise Exception("Association with the AP timed out")
566
567 def test_ap_wps_reg_config_tkip(dev, apdev):
568 """WPS registrar configuring AP to use TKIP and AP upgrading to TKIP+CCMP"""
569 ssid = "test-wps-init-ap"
570 appin = "12345670"
571 hostapd.add_ap(apdev[0]['ifname'],
572 { "ssid": ssid, "eap_server": "1", "wps_state": "1",
573 "ap_pin": appin})
574 logger.info("WPS configuration step")
575 dev[0].request("SET wps_version_number 0x10")
576 dev[0].dump_monitor()
577 new_ssid = "wps-new-ssid-with-tkip"
578 new_passphrase = "1234567890"
579 dev[0].wps_reg(apdev[0]['bssid'], appin, new_ssid, "WPAPSK", "TKIP",
580 new_passphrase)
581 logger.info("Re-connect to verify WPA2 mixed mode")
582 dev[0].request("DISCONNECT")
583 id = 0
584 dev[0].set_network(id, "pairwise", "CCMP")
585 dev[0].set_network(id, "proto", "RSN")
586 dev[0].connect_network(id)
587 status = dev[0].get_status()
588 if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
589 raise Exception("Not fully connected")
590 if status['ssid'] != new_ssid:
591 raise Exception("Unexpected SSID")
592 if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'TKIP':
593 raise Exception("Unexpected encryption configuration")
594 if status['key_mgmt'] != 'WPA2-PSK':
595 raise Exception("Unexpected key_mgmt")
596
597 def test_ap_wps_setup_locked(dev, apdev):
598 """WPS registrar locking up AP setup on AP PIN failures"""
599 ssid = "test-wps-incorrect-ap-pin"
600 appin = "12345670"
601 hostapd.add_ap(apdev[0]['ifname'],
602 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
603 "wpa_passphrase": "12345678", "wpa": "2",
604 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
605 "ap_pin": appin})
606 new_ssid = "wps-new-ssid-test"
607 new_passphrase = "1234567890"
608
609 ap_setup_locked=False
610 for pin in ["55554444", "1234", "12345678", "00000000", "11111111"]:
611 dev[0].dump_monitor()
612 logger.info("Try incorrect AP PIN - attempt " + pin)
613 dev[0].wps_reg(apdev[0]['bssid'], pin, new_ssid, "WPA2PSK",
614 "CCMP", new_passphrase, no_wait=True)
615 ev = dev[0].wait_event(["WPS-FAIL", "CTRL-EVENT-CONNECTED"])
616 if ev is None:
617 raise Exception("Timeout on receiving WPS operation failure event")
618 if "CTRL-EVENT-CONNECTED" in ev:
619 raise Exception("Unexpected connection")
620 if "config_error=15" in ev:
621 logger.info("AP Setup Locked")
622 ap_setup_locked=True
623 elif "config_error=18" not in ev:
624 raise Exception("config_error=18 not reported")
625 ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"])
626 if ev is None:
627 raise Exception("Timeout on disconnection event")
628 time.sleep(0.1)
629 if not ap_setup_locked:
630 raise Exception("AP setup was not locked")
631
632 hapd = hostapd.Hostapd(apdev[0]['ifname'])
633 status = hapd.request("WPS_GET_STATUS")
634 if "Last WPS result: Failed" not in status:
635 raise Exception("WPS failure result not shown correctly")
636 if "Peer Address: " + dev[0].p2p_interface_addr() not in status:
637 raise Exception("Peer address not shown correctly")
638
639 time.sleep(0.5)
640 dev[0].dump_monitor()
641 logger.info("WPS provisioning step")
642 pin = dev[0].wps_read_pin()
643 hapd = hostapd.Hostapd(apdev[0]['ifname'])
644 hapd.request("WPS_PIN any " + pin)
645 dev[0].request("WPS_PIN any " + pin)
646 ev = dev[0].wait_event(["WPS-SUCCESS"], timeout=30)
647 if ev is None:
648 raise Exception("WPS success was not reported")
649 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
650 if ev is None:
651 raise Exception("Association with the AP timed out")
652
653 appin = hapd.request("WPS_AP_PIN random")
654 if "FAIL" in appin:
655 raise Exception("Could not generate random AP PIN")
656 ev = hapd.wait_event(["WPS-AP-SETUP-UNLOCKED"], timeout=10)
657 if ev is None:
658 raise Exception("Failed to unlock AP PIN")
659
660 def test_ap_wps_setup_locked_timeout(dev, apdev):
661 """WPS re-enabling AP PIN after timeout"""
662 ssid = "test-wps-incorrect-ap-pin"
663 appin = "12345670"
664 hostapd.add_ap(apdev[0]['ifname'],
665 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
666 "wpa_passphrase": "12345678", "wpa": "2",
667 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
668 "ap_pin": appin})
669 new_ssid = "wps-new-ssid-test"
670 new_passphrase = "1234567890"
671
672 ap_setup_locked=False
673 for pin in ["55554444", "1234", "12345678", "00000000", "11111111"]:
674 dev[0].dump_monitor()
675 logger.info("Try incorrect AP PIN - attempt " + pin)
676 dev[0].wps_reg(apdev[0]['bssid'], pin, new_ssid, "WPA2PSK",
677 "CCMP", new_passphrase, no_wait=True)
678 ev = dev[0].wait_event(["WPS-FAIL", "CTRL-EVENT-CONNECTED"])
679 if ev is None:
680 raise Exception("Timeout on receiving WPS operation failure event")
681 if "CTRL-EVENT-CONNECTED" in ev:
682 raise Exception("Unexpected connection")
683 if "config_error=15" in ev:
684 logger.info("AP Setup Locked")
685 ap_setup_locked=True
686 break
687 elif "config_error=18" not in ev:
688 raise Exception("config_error=18 not reported")
689 ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"])
690 if ev is None:
691 raise Exception("Timeout on disconnection event")
692 time.sleep(0.1)
693 if not ap_setup_locked:
694 raise Exception("AP setup was not locked")
695 hapd = hostapd.Hostapd(apdev[0]['ifname'])
696 ev = hapd.wait_event(["WPS-AP-SETUP-UNLOCKED"], timeout=80)
697 if ev is None:
698 raise Exception("AP PIN did not get unlocked on 60 second timeout")
699
700 def test_ap_wps_pbc_overlap_2ap(dev, apdev):
701 """WPS PBC session overlap with two active APs"""
702 hostapd.add_ap(apdev[0]['ifname'],
703 { "ssid": "wps1", "eap_server": "1", "wps_state": "2",
704 "wpa_passphrase": "12345678", "wpa": "2",
705 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
706 "wps_independent": "1"})
707 hostapd.add_ap(apdev[1]['ifname'],
708 { "ssid": "wps2", "eap_server": "1", "wps_state": "2",
709 "wpa_passphrase": "123456789", "wpa": "2",
710 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
711 "wps_independent": "1"})
712 hapd = hostapd.Hostapd(apdev[0]['ifname'])
713 hapd.request("WPS_PBC")
714 hapd2 = hostapd.Hostapd(apdev[1]['ifname'])
715 hapd2.request("WPS_PBC")
716 logger.info("WPS provisioning step")
717 dev[0].dump_monitor()
718 dev[0].request("WPS_PBC")
719 ev = dev[0].wait_event(["WPS-OVERLAP-DETECTED"], timeout=15)
720 if ev is None:
721 raise Exception("PBC session overlap not detected")
722
723 def test_ap_wps_pbc_overlap_2sta(dev, apdev):
724 """WPS PBC session overlap with two active STAs"""
725 ssid = "test-wps-pbc-overlap"
726 hostapd.add_ap(apdev[0]['ifname'],
727 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
728 "wpa_passphrase": "12345678", "wpa": "2",
729 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
730 hapd = hostapd.Hostapd(apdev[0]['ifname'])
731 logger.info("WPS provisioning step")
732 hapd.request("WPS_PBC")
733 dev[0].dump_monitor()
734 dev[1].dump_monitor()
735 dev[0].request("WPS_PBC")
736 dev[1].request("WPS_PBC")
737 ev = dev[0].wait_event(["WPS-M2D"], timeout=15)
738 if ev is None:
739 raise Exception("PBC session overlap not detected (dev0)")
740 if "config_error=12" not in ev:
741 raise Exception("PBC session overlap not correctly reported (dev0)")
742 ev = dev[1].wait_event(["WPS-M2D"], timeout=15)
743 if ev is None:
744 raise Exception("PBC session overlap not detected (dev1)")
745 if "config_error=12" not in ev:
746 raise Exception("PBC session overlap not correctly reported (dev1)")
747 hapd.request("WPS_CANCEL")
748 ret = hapd.request("WPS_PBC")
749 if "FAIL" not in ret:
750 raise Exception("PBC mode allowed to be started while PBC overlap still active")
751
752 def test_ap_wps_cancel(dev, apdev):
753 """WPS AP cancelling enabled config method"""
754 ssid = "test-wps-ap-cancel"
755 hostapd.add_ap(apdev[0]['ifname'],
756 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
757 "wpa_passphrase": "12345678", "wpa": "2",
758 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP" })
759 bssid = apdev[0]['bssid']
760 hapd = hostapd.Hostapd(apdev[0]['ifname'])
761
762 logger.info("Verify PBC enable/cancel")
763 hapd.request("WPS_PBC")
764 dev[0].scan(freq="2412")
765 bss = dev[0].get_bss(apdev[0]['bssid'])
766 if "[WPS-PBC]" not in bss['flags']:
767 raise Exception("WPS-PBC flag missing")
768 if "FAIL" in hapd.request("WPS_CANCEL"):
769 raise Exception("WPS_CANCEL failed")
770 dev[0].scan(freq="2412")
771 bss = dev[0].get_bss(apdev[0]['bssid'])
772 if "[WPS-PBC]" in bss['flags']:
773 raise Exception("WPS-PBC flag not cleared")
774
775 logger.info("Verify PIN enable/cancel")
776 hapd.request("WPS_PIN any 12345670")
777 dev[0].scan(freq="2412")
778 bss = dev[0].get_bss(apdev[0]['bssid'])
779 if "[WPS-AUTH]" not in bss['flags']:
780 raise Exception("WPS-AUTH flag missing")
781 if "FAIL" in hapd.request("WPS_CANCEL"):
782 raise Exception("WPS_CANCEL failed")
783 dev[0].scan(freq="2412")
784 bss = dev[0].get_bss(apdev[0]['bssid'])
785 if "[WPS-AUTH]" in bss['flags']:
786 raise Exception("WPS-AUTH flag not cleared")
787
788 def test_ap_wps_er_add_enrollee(dev, apdev):
789 """WPS ER configuring AP and adding a new enrollee using PIN"""
790 ssid = "wps-er-add-enrollee"
791 ap_pin = "12345670"
792 ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
793 hostapd.add_ap(apdev[0]['ifname'],
794 { "ssid": ssid, "eap_server": "1", "wps_state": "1",
795 "device_name": "Wireless AP", "manufacturer": "Company",
796 "model_name": "WAP", "model_number": "123",
797 "serial_number": "12345", "device_type": "6-0050F204-1",
798 "os_version": "01020300",
799 "config_methods": "label push_button",
800 "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"})
801 logger.info("WPS configuration step")
802 new_passphrase = "1234567890"
803 dev[0].dump_monitor()
804 dev[0].wps_reg(apdev[0]['bssid'], ap_pin, ssid, "WPA2PSK", "CCMP",
805 new_passphrase)
806 status = dev[0].get_status()
807 if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
808 raise Exception("Not fully connected")
809 if status['ssid'] != ssid:
810 raise Exception("Unexpected SSID")
811 if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'CCMP':
812 raise Exception("Unexpected encryption configuration")
813 if status['key_mgmt'] != 'WPA2-PSK':
814 raise Exception("Unexpected key_mgmt")
815
816 logger.info("Start ER")
817 dev[0].request("WPS_ER_START ifname=lo")
818 ev = dev[0].wait_event(["WPS-ER-AP-ADD"], timeout=15)
819 if ev is None:
820 raise Exception("AP discovery timed out")
821 if ap_uuid not in ev:
822 raise Exception("Expected AP UUID not found")
823
824 logger.info("Learn AP configuration through UPnP")
825 dev[0].dump_monitor()
826 dev[0].request("WPS_ER_LEARN " + ap_uuid + " " + ap_pin)
827 ev = dev[0].wait_event(["WPS-ER-AP-SETTINGS"], timeout=15)
828 if ev is None:
829 raise Exception("AP learn timed out")
830 if ap_uuid not in ev:
831 raise Exception("Expected AP UUID not in settings")
832 if "ssid=" + ssid not in ev:
833 raise Exception("Expected SSID not in settings")
834 if "key=" + new_passphrase not in ev:
835 raise Exception("Expected passphrase not in settings")
836
837 logger.info("Add Enrollee using ER")
838 pin = dev[1].wps_read_pin()
839 dev[0].dump_monitor()
840 dev[0].request("WPS_ER_PIN any " + pin + " " + dev[1].p2p_interface_addr())
841 dev[1].dump_monitor()
842 dev[1].request("WPS_PIN any " + pin)
843 ev = dev[1].wait_event(["WPS-SUCCESS"], timeout=30)
844 if ev is None:
845 raise Exception("Enrollee did not report success")
846 ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"], timeout=15)
847 if ev is None:
848 raise Exception("Association with the AP timed out")
849 ev = dev[0].wait_event(["WPS-SUCCESS"], timeout=15)
850 if ev is None:
851 raise Exception("WPS ER did not report success")
852 hwsim_utils.test_connectivity_sta(dev[0], dev[1])
853
854 logger.info("Add a specific Enrollee using ER")
855 pin = dev[2].wps_read_pin()
856 addr2 = dev[2].p2p_interface_addr()
857 dev[0].dump_monitor()
858 dev[2].dump_monitor()
859 dev[2].request("WPS_PIN any " + pin)
860 ev = dev[0].wait_event(["WPS-ER-ENROLLEE-ADD"], timeout=10)
861 if ev is None:
862 raise Exception("Enrollee not seen")
863 if addr2 not in ev:
864 raise Exception("Unexpected Enrollee MAC address")
865 dev[0].request("WPS_ER_PIN " + addr2 + " " + pin + " " + addr2)
866 ev = dev[2].wait_event(["CTRL-EVENT-CONNECTED"], timeout=15)
867 if ev is None:
868 raise Exception("Association with the AP timed out")
869 ev = dev[0].wait_event(["WPS-SUCCESS"], timeout=15)
870 if ev is None:
871 raise Exception("WPS ER did not report success")
872
873 logger.info("Verify registrar selection behavior")
874 dev[0].request("WPS_ER_PIN any " + pin + " " + dev[1].p2p_interface_addr())
875 dev[1].request("DISCONNECT")
876 dev[1].wait_event(["CTRL-EVENT-DISCONNECTED"])
877 dev[1].scan(freq="2412")
878 bss = dev[1].get_bss(apdev[0]['bssid'])
879 if "[WPS-AUTH]" not in bss['flags']:
880 raise Exception("WPS-AUTH flag missing")
881
882 logger.info("Stop ER")
883 dev[0].dump_monitor()
884 dev[0].request("WPS_ER_STOP")
885 ev = dev[0].wait_event(["WPS-ER-AP-REMOVE"])
886 if ev is None:
887 raise Exception("WPS ER unsubscription timed out")
888 # It takes some time for the UPnP UNSUBSCRIBE command to go through, so wait
889 # a bit before verifying that the scan results have change.
890 time.sleep(0.2)
891
892 dev[1].scan(freq="2412")
893 bss = dev[1].get_bss(apdev[0]['bssid'])
894 if "[WPS-AUTH]" in bss['flags']:
895 raise Exception("WPS-AUTH flag not removed")
896
897 def test_ap_wps_er_add_enrollee_pbc(dev, apdev):
898 """WPS ER connected to AP and adding a new enrollee using PBC"""
899 ssid = "wps-er-add-enrollee-pbc"
900 ap_pin = "12345670"
901 ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
902 hostapd.add_ap(apdev[0]['ifname'],
903 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
904 "wpa_passphrase": "12345678", "wpa": "2",
905 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
906 "device_name": "Wireless AP", "manufacturer": "Company",
907 "model_name": "WAP", "model_number": "123",
908 "serial_number": "12345", "device_type": "6-0050F204-1",
909 "os_version": "01020300",
910 "config_methods": "label push_button",
911 "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"})
912 logger.info("Learn AP configuration")
913 dev[0].dump_monitor()
914 dev[0].wps_reg(apdev[0]['bssid'], ap_pin)
915 status = dev[0].get_status()
916 if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
917 raise Exception("Not fully connected")
918
919 logger.info("Start ER")
920 dev[0].request("WPS_ER_START ifname=lo")
921 ev = dev[0].wait_event(["WPS-ER-AP-ADD"], timeout=15)
922 if ev is None:
923 raise Exception("AP discovery timed out")
924 if ap_uuid not in ev:
925 raise Exception("Expected AP UUID not found")
926
927 logger.info("Use learned network configuration on ER")
928 dev[0].request("WPS_ER_SET_CONFIG " + ap_uuid + " 0")
929
930 logger.info("Add Enrollee using ER and PBC")
931 dev[0].dump_monitor()
932 enrollee = dev[1].p2p_interface_addr()
933 dev[1].dump_monitor()
934 dev[1].request("WPS_PBC")
935
936 for i in range(0, 2):
937 ev = dev[0].wait_event(["WPS-ER-ENROLLEE-ADD"], timeout=15)
938 if ev is None:
939 raise Exception("Enrollee discovery timed out")
940 if enrollee in ev:
941 break
942 if i == 1:
943 raise Exception("Expected Enrollee not found")
944 dev[0].request("WPS_ER_PBC " + enrollee)
945
946 ev = dev[1].wait_event(["WPS-SUCCESS"], timeout=15)
947 if ev is None:
948 raise Exception("Enrollee did not report success")
949 ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"], timeout=15)
950 if ev is None:
951 raise Exception("Association with the AP timed out")
952 ev = dev[0].wait_event(["WPS-SUCCESS"], timeout=15)
953 if ev is None:
954 raise Exception("WPS ER did not report success")
955 hwsim_utils.test_connectivity_sta(dev[0], dev[1])
956
957 # verify BSSID selection of the AP instead of UUID
958 if "FAIL" in dev[0].request("WPS_ER_SET_CONFIG " + apdev[0]['bssid'] + " 0"):
959 raise Exception("Could not select AP based on BSSID")
960
961 def test_ap_wps_er_v10_add_enrollee_pin(dev, apdev):
962 """WPS v1.0 ER connected to AP and adding a new enrollee using PIN"""
963 ssid = "wps-er-add-enrollee-pbc"
964 ap_pin = "12345670"
965 ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
966 hostapd.add_ap(apdev[0]['ifname'],
967 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
968 "wpa_passphrase": "12345678", "wpa": "2",
969 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
970 "device_name": "Wireless AP", "manufacturer": "Company",
971 "model_name": "WAP", "model_number": "123",
972 "serial_number": "12345", "device_type": "6-0050F204-1",
973 "os_version": "01020300",
974 "config_methods": "label push_button",
975 "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"})
976 logger.info("Learn AP configuration")
977 dev[0].request("SET wps_version_number 0x10")
978 dev[0].dump_monitor()
979 dev[0].wps_reg(apdev[0]['bssid'], ap_pin)
980 status = dev[0].get_status()
981 if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
982 raise Exception("Not fully connected")
983
984 logger.info("Start ER")
985 dev[0].request("WPS_ER_START ifname=lo")
986 ev = dev[0].wait_event(["WPS-ER-AP-ADD"], timeout=15)
987 if ev is None:
988 raise Exception("AP discovery timed out")
989 if ap_uuid not in ev:
990 raise Exception("Expected AP UUID not found")
991
992 logger.info("Use learned network configuration on ER")
993 dev[0].request("WPS_ER_SET_CONFIG " + ap_uuid + " 0")
994
995 logger.info("Add Enrollee using ER and PIN")
996 enrollee = dev[1].p2p_interface_addr()
997 pin = dev[1].wps_read_pin()
998 dev[0].dump_monitor()
999 dev[0].request("WPS_ER_PIN any " + pin + " " + enrollee)
1000 dev[1].dump_monitor()
1001 dev[1].request("WPS_PIN any " + pin)
1002 ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"], timeout=15)
1003 if ev is None:
1004 raise Exception("Association with the AP timed out")
1005 ev = dev[0].wait_event(["WPS-SUCCESS"], timeout=15)
1006 if ev is None:
1007 raise Exception("WPS ER did not report success")
1008
1009 def test_ap_wps_er_config_ap(dev, apdev):
1010 """WPS ER configuring AP over UPnP"""
1011 ssid = "wps-er-ap-config"
1012 ap_pin = "12345670"
1013 ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
1014 hostapd.add_ap(apdev[0]['ifname'],
1015 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1016 "wpa_passphrase": "12345678", "wpa": "2",
1017 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
1018 "device_name": "Wireless AP", "manufacturer": "Company",
1019 "model_name": "WAP", "model_number": "123",
1020 "serial_number": "12345", "device_type": "6-0050F204-1",
1021 "os_version": "01020300",
1022 "config_methods": "label push_button",
1023 "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"})
1024
1025 logger.info("Connect ER to the AP")
1026 dev[0].connect(ssid, psk="12345678", scan_freq="2412")
1027
1028 logger.info("WPS configuration step")
1029 dev[0].request("WPS_ER_START ifname=lo")
1030 ev = dev[0].wait_event(["WPS-ER-AP-ADD"], timeout=15)
1031 if ev is None:
1032 raise Exception("AP discovery timed out")
1033 if ap_uuid not in ev:
1034 raise Exception("Expected AP UUID not found")
1035 new_passphrase = "1234567890"
1036 dev[0].request("WPS_ER_CONFIG " + apdev[0]['bssid'] + " " + ap_pin + " " +
1037 ssid.encode("hex") + " WPA2PSK CCMP " +
1038 new_passphrase.encode("hex"))
1039 ev = dev[0].wait_event(["WPS-SUCCESS"])
1040 if ev is None:
1041 raise Exception("WPS ER configuration operation timed out")
1042 dev[1].wait_event(["CTRL-EVENT-DISCONNECTED"])
1043 dev[0].connect(ssid, psk="1234567890", scan_freq="2412")
1044
1045 def test_ap_wps_fragmentation(dev, apdev):
1046 """WPS with fragmentation in EAP-WSC and mixed mode WPA+WPA2"""
1047 ssid = "test-wps-fragmentation"
1048 appin = "12345670"
1049 hostapd.add_ap(apdev[0]['ifname'],
1050 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1051 "wpa_passphrase": "12345678", "wpa": "3",
1052 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
1053 "wpa_pairwise": "TKIP", "ap_pin": appin,
1054 "fragment_size": "50" })
1055 hapd = hostapd.Hostapd(apdev[0]['ifname'])
1056 logger.info("WPS provisioning step (PBC)")
1057 hapd.request("WPS_PBC")
1058 dev[0].dump_monitor()
1059 dev[0].request("SET wps_fragment_size 50")
1060 dev[0].request("WPS_PBC")
1061 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
1062 if ev is None:
1063 raise Exception("Association with the AP timed out")
1064 status = dev[0].get_status()
1065 if status['wpa_state'] != 'COMPLETED':
1066 raise Exception("Not fully connected")
1067 if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'TKIP':
1068 raise Exception("Unexpected encryption configuration")
1069 if status['key_mgmt'] != 'WPA2-PSK':
1070 raise Exception("Unexpected key_mgmt")
1071
1072 logger.info("WPS provisioning step (PIN)")
1073 pin = dev[1].wps_read_pin()
1074 hapd.request("WPS_PIN any " + pin)
1075 dev[1].request("SET wps_fragment_size 50")
1076 dev[1].request("WPS_PIN any " + pin)
1077 ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
1078 if ev is None:
1079 raise Exception("Association with the AP timed out")
1080 status = dev[1].get_status()
1081 if status['wpa_state'] != 'COMPLETED':
1082 raise Exception("Not fully connected")
1083 if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'TKIP':
1084 raise Exception("Unexpected encryption configuration")
1085 if status['key_mgmt'] != 'WPA2-PSK':
1086 raise Exception("Unexpected key_mgmt")
1087
1088 logger.info("WPS connection as registrar")
1089 dev[2].request("SET wps_fragment_size 50")
1090 dev[2].wps_reg(apdev[0]['bssid'], appin)
1091 status = dev[2].get_status()
1092 if status['wpa_state'] != 'COMPLETED':
1093 raise Exception("Not fully connected")
1094 if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'TKIP':
1095 raise Exception("Unexpected encryption configuration")
1096 if status['key_mgmt'] != 'WPA2-PSK':
1097 raise Exception("Unexpected key_mgmt")
1098
1099 def test_ap_wps_new_version_sta(dev, apdev):
1100 """WPS compatibility with new version number on the station"""
1101 ssid = "test-wps-ver"
1102 hostapd.add_ap(apdev[0]['ifname'],
1103 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1104 "wpa_passphrase": "12345678", "wpa": "2",
1105 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP" })
1106 hapd = hostapd.Hostapd(apdev[0]['ifname'])
1107 logger.info("WPS provisioning step")
1108 hapd.request("WPS_PBC")
1109 dev[0].dump_monitor()
1110 dev[0].request("SET wps_version_number 0x43")
1111 dev[0].request("SET wps_vendor_ext_m1 000137100100020001")
1112 dev[0].request("WPS_PBC")
1113 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
1114 if ev is None:
1115 raise Exception("Association with the AP timed out")
1116
1117 def test_ap_wps_new_version_ap(dev, apdev):
1118 """WPS compatibility with new version number on the AP"""
1119 ssid = "test-wps-ver"
1120 hostapd.add_ap(apdev[0]['ifname'],
1121 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1122 "wpa_passphrase": "12345678", "wpa": "2",
1123 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP" })
1124 hapd = hostapd.Hostapd(apdev[0]['ifname'])
1125 logger.info("WPS provisioning step")
1126 if "FAIL" in hapd.request("SET wps_version_number 0x43"):
1127 raise Exception("Failed to enable test functionality")
1128 hapd.request("WPS_PBC")
1129 dev[0].dump_monitor()
1130 dev[0].request("WPS_PBC")
1131 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
1132 hapd.request("SET wps_version_number 0x20")
1133 if ev is None:
1134 raise Exception("Association with the AP timed out")
1135
1136 def test_ap_wps_check_pin(dev, apdev):
1137 """Verify PIN checking through control interface"""
1138 hostapd.add_ap(apdev[0]['ifname'],
1139 { "ssid": "wps", "eap_server": "1", "wps_state": "2",
1140 "wpa_passphrase": "12345678", "wpa": "2",
1141 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP" })
1142 hapd = hostapd.Hostapd(apdev[0]['ifname'])
1143 for t in [ ("12345670", "12345670"),
1144 ("12345678", "FAIL-CHECKSUM"),
1145 ("12345", "FAIL"),
1146 ("123456789", "FAIL"),
1147 ("1234-5670", "12345670"),
1148 ("1234 5670", "12345670"),
1149 ("1-2.3:4 5670", "12345670") ]:
1150 res = hapd.request("WPS_CHECK_PIN " + t[0]).rstrip('\n')
1151 res2 = dev[0].request("WPS_CHECK_PIN " + t[0]).rstrip('\n')
1152 if res != res2:
1153 raise Exception("Unexpected difference in WPS_CHECK_PIN responses")
1154 if res != t[1]:
1155 raise Exception("Incorrect WPS_CHECK_PIN response {} (expected {})".format(res, t[1]))
1156
1157 if "FAIL" not in hapd.request("WPS_CHECK_PIN 12345"):
1158 raise Exception("Unexpected WPS_CHECK_PIN success")
1159 if "FAIL" not in hapd.request("WPS_CHECK_PIN 123456789"):
1160 raise Exception("Unexpected WPS_CHECK_PIN success")
1161
1162 for i in range(0, 10):
1163 pin = dev[0].request("WPS_PIN get")
1164 rpin = dev[0].request("WPS_CHECK_PIN " + pin).rstrip('\n')
1165 if pin != rpin:
1166 raise Exception("Random PIN validation failed for " + pin)
1167
1168 def test_ap_wps_wep_config(dev, apdev):
1169 """WPS 2.0 AP rejecting WEP configuration"""
1170 ssid = "test-wps-config"
1171 appin = "12345670"
1172 hostapd.add_ap(apdev[0]['ifname'],
1173 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1174 "ap_pin": appin})
1175 hapd = hostapd.Hostapd(apdev[0]['ifname'])
1176 dev[0].wps_reg(apdev[0]['bssid'], appin, "wps-new-ssid-wep", "OPEN", "WEP",
1177 "hello", no_wait=True)
1178 ev = hapd.wait_event(["WPS-FAIL"], timeout=15)
1179 if ev is None:
1180 raise Exception("WPS-FAIL timed out")
1181 if "reason=2" not in ev:
1182 raise Exception("Unexpected reason code in WPS-FAIL")
1183 status = hapd.request("WPS_GET_STATUS")
1184 if "Last WPS result: Failed" not in status:
1185 raise Exception("WPS failure result not shown correctly")
1186 if "Failure Reason: WEP Prohibited" not in status:
1187 raise Exception("Failure reason not reported correctly")
1188 if "Peer Address: " + dev[0].p2p_interface_addr() not in status:
1189 raise Exception("Peer address not shown correctly")
1190
1191 def test_ap_wps_wep_enroll(dev, apdev):
1192 """WPS 2.0 STA rejecting WEP configuration"""
1193 ssid = "test-wps-wep"
1194 hostapd.add_ap(apdev[0]['ifname'],
1195 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1196 "skip_cred_build": "1", "extra_cred": "wps-wep-cred" })
1197 hapd = hostapd.Hostapd(apdev[0]['ifname'])
1198 hapd.request("WPS_PBC")
1199 dev[0].request("WPS_PBC")
1200 ev = dev[0].wait_event(["WPS-FAIL"], timeout=15)
1201 if ev is None:
1202 raise Exception("WPS-FAIL event timed out")
1203 if "msg=12" not in ev or "reason=2 (WEP Prohibited)" not in ev:
1204 raise Exception("Unexpected WPS-FAIL event: " + ev)
1205
1206 def test_ap_wps_ie_fragmentation(dev, apdev):
1207 """WPS AP using fragmented WPS IE"""
1208 ssid = "test-wps-ie-fragmentation"
1209 params = { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1210 "wpa_passphrase": "12345678", "wpa": "2",
1211 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
1212 "device_name": "1234567890abcdef1234567890abcdef",
1213 "manufacturer": "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef",
1214 "model_name": "1234567890abcdef1234567890abcdef",
1215 "model_number": "1234567890abcdef1234567890abcdef",
1216 "serial_number": "1234567890abcdef1234567890abcdef" }
1217 hostapd.add_ap(apdev[0]['ifname'], params)
1218 hapd = hostapd.Hostapd(apdev[0]['ifname'])
1219 hapd.request("WPS_PBC")
1220 dev[0].request("WPS_PBC")
1221 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
1222 if ev is None:
1223 raise Exception("Association with the AP timed out")
1224 bss = dev[0].get_bss(apdev[0]['bssid'])
1225 if "wps_device_name" not in bss or bss['wps_device_name'] != "1234567890abcdef1234567890abcdef":
1226 logger.info(bss)
1227 raise Exception("Device Name not received correctly")
1228 if len(re.findall("dd..0050f204", bss['ie'])) != 2:
1229 raise Exception("Unexpected number of WPS IEs")
1230
1231 def get_psk(pskfile):
1232 psks = {}
1233 with open(pskfile, "r") as f:
1234 lines = f.read().splitlines()
1235 for l in lines:
1236 if l == "# WPA PSKs":
1237 continue
1238 (addr,psk) = l.split(' ')
1239 psks[addr] = psk
1240 return psks
1241
1242 def test_ap_wps_per_station_psk(dev, apdev):
1243 """WPS PBC provisioning with per-station PSK"""
1244 addr0 = dev[0].p2p_dev_addr()
1245 addr1 = dev[1].p2p_dev_addr()
1246 addr2 = dev[2].p2p_dev_addr()
1247 ssid = "wps"
1248 appin = "12345670"
1249 pskfile = "/tmp/ap_wps_per_enrollee_psk.psk_file"
1250 try:
1251 os.remove(pskfile)
1252 except:
1253 pass
1254
1255 try:
1256 with open(pskfile, "w") as f:
1257 f.write("# WPA PSKs\n")
1258
1259 params = { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1260 "wpa": "2", "wpa_key_mgmt": "WPA-PSK",
1261 "rsn_pairwise": "CCMP", "ap_pin": appin,
1262 "wpa_psk_file": pskfile }
1263 hapd = hostapd.add_ap(apdev[0]['ifname'], params)
1264
1265 logger.info("First enrollee")
1266 hapd.request("WPS_PBC")
1267 dev[0].request("WPS_PBC")
1268 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"])
1269 if ev is None:
1270 raise Exception("Association with the AP timed out (1)")
1271
1272 logger.info("Second enrollee")
1273 hapd.request("WPS_PBC")
1274 dev[1].request("WPS_PBC")
1275 ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"])
1276 if ev is None:
1277 raise Exception("Association with the AP timed out (2)")
1278
1279 logger.info("External registrar")
1280 dev[2].wps_reg(apdev[0]['bssid'], appin)
1281
1282 logger.info("Verifying PSK results")
1283 psks = get_psk(pskfile)
1284 if addr0 not in psks:
1285 raise Exception("No PSK recorded for sta0")
1286 if addr1 not in psks:
1287 raise Exception("No PSK recorded for sta1")
1288 if addr2 not in psks:
1289 raise Exception("No PSK recorded for sta2")
1290 if psks[addr0] == psks[addr1]:
1291 raise Exception("Same PSK recorded for sta0 and sta1")
1292 if psks[addr0] == psks[addr2]:
1293 raise Exception("Same PSK recorded for sta0 and sta2")
1294 if psks[addr1] == psks[addr2]:
1295 raise Exception("Same PSK recorded for sta1 and sta2")
1296
1297 dev[0].request("REMOVE_NETWORK all")
1298 logger.info("Second external registrar")
1299 dev[0].wps_reg(apdev[0]['bssid'], appin)
1300 psks2 = get_psk(pskfile)
1301 if addr0 not in psks2:
1302 raise Exception("No PSK recorded for sta0(reg)")
1303 if psks[addr0] == psks2[addr0]:
1304 raise Exception("Same PSK recorded for sta0(enrollee) and sta0(reg)")
1305 finally:
1306 os.remove(pskfile)
1307
1308 def test_ap_wps_per_station_psk_failure(dev, apdev):
1309 """WPS PBC provisioning with per-station PSK (file not writable)"""
1310 addr0 = dev[0].p2p_dev_addr()
1311 addr1 = dev[1].p2p_dev_addr()
1312 addr2 = dev[2].p2p_dev_addr()
1313 ssid = "wps"
1314 appin = "12345670"
1315 pskfile = "/tmp/ap_wps_per_enrollee_psk.psk_file"
1316 try:
1317 os.remove(pskfile)
1318 except:
1319 pass
1320
1321 try:
1322 with open(pskfile, "w") as f:
1323 f.write("# WPA PSKs\n")
1324
1325 params = { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1326 "wpa": "2", "wpa_key_mgmt": "WPA-PSK",
1327 "rsn_pairwise": "CCMP", "ap_pin": appin,
1328 "wpa_psk_file": pskfile }
1329 hapd = hostapd.add_ap(apdev[0]['ifname'], params)
1330 if "FAIL" in hapd.request("SET wpa_psk_file /tmp/does/not/exists/ap_wps_per_enrollee_psk_failure.psk_file"):
1331 raise Exception("Failed to set wpa_psk_file")
1332
1333 logger.info("First enrollee")
1334 hapd.request("WPS_PBC")
1335 dev[0].request("WPS_PBC")
1336 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"])
1337 if ev is None:
1338 raise Exception("Association with the AP timed out (1)")
1339
1340 logger.info("Second enrollee")
1341 hapd.request("WPS_PBC")
1342 dev[1].request("WPS_PBC")
1343 ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"])
1344 if ev is None:
1345 raise Exception("Association with the AP timed out (2)")
1346
1347 logger.info("External registrar")
1348 dev[2].wps_reg(apdev[0]['bssid'], appin)
1349
1350 logger.info("Verifying PSK results")
1351 psks = get_psk(pskfile)
1352 if len(psks) > 0:
1353 raise Exception("PSK recorded unexpectedly")
1354 finally:
1355 os.remove(pskfile)
1356
1357 def test_ap_wps_pin_request_file(dev, apdev):
1358 """WPS PIN provisioning with configured AP"""
1359 ssid = "wps"
1360 pinfile = "/tmp/ap_wps_pin_request_file.log"
1361 if os.path.exists(pinfile):
1362 subprocess.call(['sudo', 'rm', pinfile])
1363 hostapd.add_ap(apdev[0]['ifname'],
1364 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1365 "wps_pin_requests": pinfile,
1366 "wpa_passphrase": "12345678", "wpa": "2",
1367 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
1368 hapd = hostapd.Hostapd(apdev[0]['ifname'])
1369 uuid = dev[0].get_status_field("uuid")
1370 pin = dev[0].wps_read_pin()
1371 try:
1372 dev[0].request("WPS_PIN any " + pin)
1373 ev = hapd.wait_event(["WPS-PIN-NEEDED"], timeout=15)
1374 if ev is None:
1375 raise Exception("PIN needed event not shown")
1376 if uuid not in ev:
1377 raise Exception("UUID mismatch")
1378 dev[0].request("WPS_CANCEL")
1379 success = False
1380 with open(pinfile, "r") as f:
1381 lines = f.readlines()
1382 for l in lines:
1383 if uuid in l:
1384 success = True
1385 break
1386 if not success:
1387 raise Exception("PIN request entry not in the log file")
1388 finally:
1389 subprocess.call(['sudo', 'rm', pinfile])
1390
1391 def test_ap_wps_auto_setup_with_config_file(dev, apdev):
1392 """WPS auto-setup with configuration file"""
1393 conffile = "/tmp/ap_wps_auto_setup_with_config_file.conf"
1394 ifname = apdev[0]['ifname']
1395 try:
1396 with open(conffile, "w") as f:
1397 f.write("driver=nl80211\n")
1398 f.write("hw_mode=g\n")
1399 f.write("channel=1\n")
1400 f.write("ieee80211n=1\n")
1401 f.write("interface=%s\n" % ifname)
1402 f.write("ctrl_interface=/var/run/hostapd\n")
1403 f.write("ssid=wps\n")
1404 f.write("eap_server=1\n")
1405 f.write("wps_state=1\n")
1406 hostapd.add_bss('phy3', ifname, conffile)
1407 hapd = hostapd.Hostapd(ifname)
1408 hapd.request("WPS_PBC")
1409 dev[0].request("WPS_PBC")
1410 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
1411 if ev is None:
1412 raise Exception("Association with the AP timed out")
1413 with open(conffile, "r") as f:
1414 lines = f.read().splitlines()
1415 vals = dict()
1416 for l in lines:
1417 try:
1418 [name,value] = l.split('=', 1)
1419 vals[name] = value
1420 except ValueError, e:
1421 if "# WPS configuration" in l:
1422 pass
1423 else:
1424 raise Exception("Unexpected configuration line: " + l)
1425 if vals['ieee80211n'] != '1' or vals['wps_state'] != '2' or "WPA-PSK" not in vals['wpa_key_mgmt']:
1426 raise Exception("Incorrect configuration: " + str(vals))
1427 finally:
1428 subprocess.call(['sudo', 'rm', conffile])
1429
1430 def test_ap_wps_pbc_timeout(dev, apdev, params):
1431 """wpa_supplicant PBC walk time [long]"""
1432 if not params['long']:
1433 logger.info("Skip test case with long duration due to --long not specified")
1434 return "skip"
1435 ssid = "test-wps"
1436 hostapd.add_ap(apdev[0]['ifname'],
1437 { "ssid": ssid, "eap_server": "1", "wps_state": "1" })
1438 hapd = hostapd.Hostapd(apdev[0]['ifname'])
1439 logger.info("Start WPS_PBC and wait for PBC walk time expiration")
1440 if "OK" not in dev[0].request("WPS_PBC"):
1441 raise Exception("WPS_PBC failed")
1442 ev = dev[0].wait_event(["WPS-TIMEOUT"], timeout=150)
1443 if ev is None:
1444 raise Exception("WPS-TIMEOUT not reported")
1445
1446 def add_ssdp_ap(ifname, ap_uuid):
1447 ssid = "wps-ssdp"
1448 ap_pin = "12345670"
1449 hostapd.add_ap(ifname,
1450 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1451 "wpa_passphrase": "12345678", "wpa": "2",
1452 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
1453 "device_name": "Wireless AP", "manufacturer": "Company",
1454 "model_name": "WAP", "model_number": "123",
1455 "serial_number": "12345", "device_type": "6-0050F204-1",
1456 "os_version": "01020300",
1457 "config_methods": "label push_button",
1458 "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo",
1459 "friendly_name": "WPS Access Point",
1460 "manufacturer_url": "http://www.example.com/",
1461 "model_description": "Wireless Access Point",
1462 "model_url": "http://www.example.com/model/",
1463 "upc": "123456789012" })
1464
1465 def ssdp_send(msg, no_recv=False):
1466 socket.setdefaulttimeout(1)
1467 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
1468 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1469 sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
1470 sock.bind(("127.0.0.1", 0))
1471 sock.sendto(msg, ("239.255.255.250", 1900))
1472 if no_recv:
1473 return None
1474 return sock.recv(1000)
1475
1476 def ssdp_send_msearch(st):
1477 msg = '\r\n'.join([
1478 'M-SEARCH * HTTP/1.1',
1479 'HOST: 239.255.255.250:1900',
1480 'MX: 1',
1481 'MAN: "ssdp:discover"',
1482 'ST: ' + st,
1483 '', ''])
1484 return ssdp_send(msg)
1485
1486 def test_ap_wps_ssdp_msearch(dev, apdev):
1487 """WPS AP and SSDP M-SEARCH messages"""
1488 ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
1489 add_ssdp_ap(apdev[0]['ifname'], ap_uuid)
1490
1491 msg = '\r\n'.join([
1492 'M-SEARCH * HTTP/1.1',
1493 'Host: 239.255.255.250:1900',
1494 'Mx: 1',
1495 'Man: "ssdp:discover"',
1496 'St: urn:schemas-wifialliance-org:device:WFADevice:1',
1497 '', ''])
1498 ssdp_send(msg)
1499
1500 msg = '\r\n'.join([
1501 'M-SEARCH * HTTP/1.1',
1502 'host:\t239.255.255.250:1900\t\t\t\t \t\t',
1503 'mx: \t1\t\t ',
1504 'man: \t \t "ssdp:discover" ',
1505 'st: urn:schemas-wifialliance-org:device:WFADevice:1\t\t',
1506 '', ''])
1507 ssdp_send(msg)
1508
1509 ssdp_send_msearch("ssdp:all")
1510 ssdp_send_msearch("upnp:rootdevice")
1511 ssdp_send_msearch("uuid:" + ap_uuid)
1512 ssdp_send_msearch("urn:schemas-wifialliance-org:service:WFAWLANConfig:1")
1513 ssdp_send_msearch("urn:schemas-wifialliance-org:device:WFADevice:1");
1514
1515 msg = '\r\n'.join([
1516 'M-SEARCH * HTTP/1.1',
1517 'HOST:\t239.255.255.250:1900',
1518 'MAN: "ssdp:discover"',
1519 'MX: 130',
1520 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1521 '', ''])
1522 ssdp_send(msg, no_recv=True)
1523
1524 def test_ap_wps_ssdp_invalid_msearch(dev, apdev):
1525 """WPS AP and invalid SSDP M-SEARCH messages"""
1526 ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
1527 add_ssdp_ap(apdev[0]['ifname'], ap_uuid)
1528
1529 socket.setdefaulttimeout(1)
1530 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
1531 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1532 sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
1533 sock.bind(("127.0.0.1", 0))
1534
1535 logger.debug("Missing MX")
1536 msg = '\r\n'.join([
1537 'M-SEARCH * HTTP/1.1',
1538 'HOST: 239.255.255.250:1900',
1539 'MAN: "ssdp:discover"',
1540 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1541 '', ''])
1542 sock.sendto(msg, ("239.255.255.250", 1900))
1543
1544 logger.debug("Negative MX")
1545 msg = '\r\n'.join([
1546 'M-SEARCH * HTTP/1.1',
1547 'HOST: 239.255.255.250:1900',
1548 'MX: -1',
1549 'MAN: "ssdp:discover"',
1550 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1551 '', ''])
1552 sock.sendto(msg, ("239.255.255.250", 1900))
1553
1554 logger.debug("Invalid MX")
1555 msg = '\r\n'.join([
1556 'M-SEARCH * HTTP/1.1',
1557 'HOST: 239.255.255.250:1900',
1558 'MX; 1',
1559 'MAN: "ssdp:discover"',
1560 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1561 '', ''])
1562 sock.sendto(msg, ("239.255.255.250", 1900))
1563
1564 logger.debug("Missing MAN")
1565 msg = '\r\n'.join([
1566 'M-SEARCH * HTTP/1.1',
1567 'HOST: 239.255.255.250:1900',
1568 'MX: 1',
1569 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1570 '', ''])
1571 sock.sendto(msg, ("239.255.255.250", 1900))
1572
1573 logger.debug("Invalid MAN")
1574 msg = '\r\n'.join([
1575 'M-SEARCH * HTTP/1.1',
1576 'HOST: 239.255.255.250:1900',
1577 'MX: 1',
1578 'MAN: foo',
1579 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1580 '', ''])
1581 sock.sendto(msg, ("239.255.255.250", 1900))
1582 msg = '\r\n'.join([
1583 'M-SEARCH * HTTP/1.1',
1584 'HOST: 239.255.255.250:1900',
1585 'MX: 1',
1586 'MAN; "ssdp:discover"',
1587 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1588 '', ''])
1589 sock.sendto(msg, ("239.255.255.250", 1900))
1590
1591 logger.debug("Missing HOST")
1592 msg = '\r\n'.join([
1593 'M-SEARCH * HTTP/1.1',
1594 'MAN: "ssdp:discover"',
1595 'MX: 1',
1596 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1597 '', ''])
1598 sock.sendto(msg, ("239.255.255.250", 1900))
1599
1600 logger.debug("Missing ST")
1601 msg = '\r\n'.join([
1602 'M-SEARCH * HTTP/1.1',
1603 'HOST: 239.255.255.250:1900',
1604 'MAN: "ssdp:discover"',
1605 'MX: 1',
1606 '', ''])
1607 sock.sendto(msg, ("239.255.255.250", 1900))
1608
1609 logger.debug("Mismatching ST")
1610 msg = '\r\n'.join([
1611 'M-SEARCH * HTTP/1.1',
1612 'HOST: 239.255.255.250:1900',
1613 'MAN: "ssdp:discover"',
1614 'MX: 1',
1615 'ST: uuid:16d5f8a9-4ee4-4f5e-81f9-cc6e2f47f42d',
1616 '', ''])
1617 sock.sendto(msg, ("239.255.255.250", 1900))
1618 msg = '\r\n'.join([
1619 'M-SEARCH * HTTP/1.1',
1620 'HOST: 239.255.255.250:1900',
1621 'MAN: "ssdp:discover"',
1622 'MX: 1',
1623 'ST: foo:bar',
1624 '', ''])
1625 sock.sendto(msg, ("239.255.255.250", 1900))
1626 msg = '\r\n'.join([
1627 'M-SEARCH * HTTP/1.1',
1628 'HOST: 239.255.255.250:1900',
1629 'MAN: "ssdp:discover"',
1630 'MX: 1',
1631 'ST: foobar',
1632 '', ''])
1633 sock.sendto(msg, ("239.255.255.250", 1900))
1634
1635 logger.debug("Invalid ST")
1636 msg = '\r\n'.join([
1637 'M-SEARCH * HTTP/1.1',
1638 'HOST: 239.255.255.250:1900',
1639 'MAN: "ssdp:discover"',
1640 'MX: 1',
1641 'ST; urn:schemas-wifialliance-org:device:WFADevice:1',
1642 '', ''])
1643 sock.sendto(msg, ("239.255.255.250", 1900))
1644
1645 logger.debug("Invalid M-SEARCH")
1646 msg = '\r\n'.join([
1647 'M+SEARCH * HTTP/1.1',
1648 'HOST: 239.255.255.250:1900',
1649 'MAN: "ssdp:discover"',
1650 'MX: 1',
1651 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1652 '', ''])
1653 sock.sendto(msg, ("239.255.255.250", 1900))
1654 msg = '\r\n'.join([
1655 'M-SEARCH-* HTTP/1.1',
1656 'HOST: 239.255.255.250:1900',
1657 'MAN: "ssdp:discover"',
1658 'MX: 1',
1659 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1660 '', ''])
1661 sock.sendto(msg, ("239.255.255.250", 1900))
1662
1663 logger.debug("Invalid message format")
1664 sock.sendto("NOTIFY * HTTP/1.1", ("239.255.255.250", 1900))
1665 msg = '\r'.join([
1666 'M-SEARCH * HTTP/1.1',
1667 'HOST: 239.255.255.250:1900',
1668 'MAN: "ssdp:discover"',
1669 'MX: 1',
1670 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1671 '', ''])
1672 sock.sendto(msg, ("239.255.255.250", 1900))
1673
1674 try:
1675 r = sock.recv(1000)
1676 raise Exception("Unexpected M-SEARCH response: " + r)
1677 except socket.timeout:
1678 pass
1679
1680 logger.debug("Valid M-SEARCH")
1681 msg = '\r\n'.join([
1682 'M-SEARCH * HTTP/1.1',
1683 'HOST: 239.255.255.250:1900',
1684 'MAN: "ssdp:discover"',
1685 'MX: 1',
1686 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1687 '', ''])
1688 sock.sendto(msg, ("239.255.255.250", 1900))
1689
1690 try:
1691 r = sock.recv(1000)
1692 pass
1693 except socket.timeout:
1694 raise Exception("No SSDP response")
1695
1696 def test_ap_wps_ssdp_burst(dev, apdev):
1697 """WPS AP and SSDP burst"""
1698 ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
1699 add_ssdp_ap(apdev[0]['ifname'], ap_uuid)
1700
1701 msg = '\r\n'.join([
1702 'M-SEARCH * HTTP/1.1',
1703 'HOST: 239.255.255.250:1900',
1704 'MAN: "ssdp:discover"',
1705 'MX: 1',
1706 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1707 '', ''])
1708 socket.setdefaulttimeout(1)
1709 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
1710 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1711 sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
1712 sock.bind(("127.0.0.1", 0))
1713 for i in range(0, 25):
1714 sock.sendto(msg, ("239.255.255.250", 1900))
1715 resp = 0
1716 while True:
1717 try:
1718 r = sock.recv(1000)
1719 if not r.startswith("HTTP/1.1 200 OK\r\n"):
1720 raise Exception("Unexpected message: " + r)
1721 resp += 1
1722 except socket.timeout:
1723 break
1724 if resp < 20:
1725 raise Exception("Too few SSDP responses")
1726
1727 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
1728 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1729 sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
1730 sock.bind(("127.0.0.1", 0))
1731 for i in range(0, 25):
1732 sock.sendto(msg, ("239.255.255.250", 1900))
1733 while True:
1734 try:
1735 r = sock.recv(1000)
1736 if ap_uuid in r:
1737 break
1738 except socket.timeout:
1739 raise Exception("No SSDP response")
1740
1741 def ssdp_get_location(uuid):
1742 res = ssdp_send_msearch("uuid:" + uuid)
1743 location = None
1744 for l in res.splitlines():
1745 if l.lower().startswith("location:"):
1746 location = l.split(':', 1)[1].strip()
1747 break
1748 if location is None:
1749 raise Exception("No UPnP location found")
1750 return location
1751
1752 def upnp_get_urls(location):
1753 conn = urllib.urlopen(location)
1754 tree = ET.parse(conn)
1755 root = tree.getroot()
1756 urn = '{urn:schemas-upnp-org:device-1-0}'
1757 service = root.find("./" + urn + "device/" + urn + "serviceList/" + urn + "service")
1758 res = {}
1759 res['scpd_url'] = urlparse.urljoin(location, service.find(urn + 'SCPDURL').text)
1760 res['control_url'] = urlparse.urljoin(location, service.find(urn + 'controlURL').text)
1761 res['event_sub_url'] = urlparse.urljoin(location, service.find(urn + 'eventSubURL').text)
1762 return res
1763
1764 def upnp_soap_action(conn, path, action, include_soap_action=True, soap_action_override=None):
1765 soapns = 'http://schemas.xmlsoap.org/soap/envelope/'
1766 wpsns = 'urn:schemas-wifialliance-org:service:WFAWLANConfig:1'
1767 ET.register_namespace('soapenv', soapns)
1768 ET.register_namespace('wfa', wpsns)
1769 attrib = {}
1770 attrib['{%s}encodingStyle' % soapns] = 'http://schemas.xmlsoap.org/soap/encoding/'
1771 root = ET.Element("{%s}Envelope" % soapns, attrib=attrib)
1772 body = ET.SubElement(root, "{%s}Body" % soapns)
1773 act = ET.SubElement(body, "{%s}%s" % (wpsns, action))
1774 tree = ET.ElementTree(root)
1775 soap = StringIO.StringIO()
1776 tree.write(soap, xml_declaration=True, encoding='utf-8')
1777
1778 headers = { "Content-type": 'text/xml; charset="utf-8"' }
1779 if include_soap_action:
1780 headers["SOAPAction"] = '"urn:schemas-wifialliance-org:service:WFAWLANConfig:1#%s"' % action
1781 elif soap_action_override:
1782 headers["SOAPAction"] = soap_action_override
1783 conn.request("POST", path, soap.getvalue(), headers)
1784 return conn.getresponse()
1785
1786 def test_ap_wps_upnp(dev, apdev):
1787 """WPS AP and UPnP operations"""
1788 ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
1789 add_ssdp_ap(apdev[0]['ifname'], ap_uuid)
1790
1791 location = ssdp_get_location(ap_uuid)
1792 urls = upnp_get_urls(location)
1793
1794 conn = urllib.urlopen(urls['scpd_url'])
1795 scpd = conn.read()
1796
1797 conn = urllib.urlopen(urlparse.urljoin(location, "unknown.html"))
1798 if conn.getcode() != 404:
1799 raise Exception("Unexpected HTTP response to GET unknown URL")
1800
1801 url = urlparse.urlparse(location)
1802 conn = httplib.HTTPConnection(url.netloc)
1803 #conn.set_debuglevel(1)
1804 headers = { "Content-type": 'text/xml; charset="utf-8"',
1805 "SOAPAction": '"urn:schemas-wifialliance-org:service:WFAWLANConfig:1#GetDeviceInfo"' }
1806 conn.request("POST", "hello", "\r\n\r\n", headers)
1807 resp = conn.getresponse()
1808 if resp.status != 404:
1809 raise Exception("Unexpected HTTP response: %s" % resp.status)
1810
1811 conn.request("UNKNOWN", "hello", "\r\n\r\n", headers)
1812 resp = conn.getresponse()
1813 if resp.status != 501:
1814 raise Exception("Unexpected HTTP response: %s" % resp.status)
1815
1816 headers = { "Content-type": 'text/xml; charset="utf-8"',
1817 "SOAPAction": '"urn:some-unknown-action#GetDeviceInfo"' }
1818 ctrlurl = urlparse.urlparse(urls['control_url'])
1819 conn.request("POST", ctrlurl.path, "\r\n\r\n", headers)
1820 resp = conn.getresponse()
1821 if resp.status != 401:
1822 raise Exception("Unexpected HTTP response: %s" % resp.status)
1823
1824 logger.debug("GetDeviceInfo without SOAPAction header")
1825 resp = upnp_soap_action(conn, ctrlurl.path, "GetDeviceInfo",
1826 include_soap_action=False)
1827 if resp.status != 401:
1828 raise Exception("Unexpected HTTP response: %s" % resp.status)
1829
1830 logger.debug("GetDeviceInfo with invalid SOAPAction header")
1831 for act in [ "foo",
1832 "urn:schemas-wifialliance-org:service:WFAWLANConfig:1#GetDeviceInfo",
1833 '"urn:schemas-wifialliance-org:service:WFAWLANConfig:1"',
1834 '"urn:schemas-wifialliance-org:service:WFAWLANConfig:123#GetDevice']:
1835 resp = upnp_soap_action(conn, ctrlurl.path, "GetDeviceInfo",
1836 include_soap_action=False,
1837 soap_action_override=act)
1838 if resp.status != 401:
1839 raise Exception("Unexpected HTTP response: %s" % resp.status)
1840
1841 resp = upnp_soap_action(conn, ctrlurl.path, "GetDeviceInfo")
1842 if resp.status != 200:
1843 raise Exception("Unexpected HTTP response: %s" % resp.status)
1844 dev = resp.read()
1845 if "NewDeviceInfo" not in dev:
1846 raise Exception("Unexpected GetDeviceInfo response")
1847
1848 logger.debug("PutMessage without required parameters")
1849 resp = upnp_soap_action(conn, ctrlurl.path, "PutMessage")
1850 if resp.status != 600:
1851 raise Exception("Unexpected HTTP response: %s" % resp.status)
1852
1853 logger.debug("PutWLANResponse without required parameters")
1854 resp = upnp_soap_action(conn, ctrlurl.path, "PutWLANResponse")
1855 if resp.status != 600:
1856 raise Exception("Unexpected HTTP response: %s" % resp.status)
1857
1858 logger.debug("SetSelectedRegistrar from unregistered ER")
1859 resp = upnp_soap_action(conn, ctrlurl.path, "SetSelectedRegistrar")
1860 if resp.status != 501:
1861 raise Exception("Unexpected HTTP response: %s" % resp.status)
1862
1863 logger.debug("Unknown action")
1864 resp = upnp_soap_action(conn, ctrlurl.path, "Unknown")
1865 if resp.status != 401:
1866 raise Exception("Unexpected HTTP response: %s" % resp.status)
1867
1868 def test_ap_wps_upnp_subscribe(dev, apdev):
1869 """WPS AP and UPnP event subscription"""
1870 ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
1871 add_ssdp_ap(apdev[0]['ifname'], ap_uuid)
1872
1873 location = ssdp_get_location(ap_uuid)
1874 urls = upnp_get_urls(location)
1875 eventurl = urlparse.urlparse(urls['event_sub_url'])
1876
1877 url = urlparse.urlparse(location)
1878 conn = httplib.HTTPConnection(url.netloc)
1879 #conn.set_debuglevel(1)
1880 headers = { "callback": '<http://127.0.0.1:12345/event>',
1881 "timeout": "Second-1234" }
1882 conn.request("SUBSCRIBE", "hello", "\r\n\r\n", headers)
1883 resp = conn.getresponse()
1884 if resp.status != 412:
1885 raise Exception("Unexpected HTTP response: %s" % resp.status)
1886
1887 conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1888 resp = conn.getresponse()
1889 if resp.status != 412:
1890 raise Exception("Unexpected HTTP response: %s" % resp.status)
1891
1892 headers = { "NT": "upnp:event",
1893 "timeout": "Second-1234" }
1894 conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1895 resp = conn.getresponse()
1896 if resp.status != 412:
1897 raise Exception("Unexpected HTTP response: %s" % resp.status)
1898
1899 headers = { "callback": '<http://127.0.0.1:12345/event>',
1900 "NT": "upnp:foobar",
1901 "timeout": "Second-1234" }
1902 conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1903 resp = conn.getresponse()
1904 if resp.status != 400:
1905 raise Exception("Unexpected HTTP response: %s" % resp.status)
1906
1907 logger.debug("Valid subscription")
1908 headers = { "callback": '<http://127.0.0.1:12345/event>',
1909 "NT": "upnp:event",
1910 "timeout": "Second-1234" }
1911 conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1912 resp = conn.getresponse()
1913 if resp.status != 200:
1914 raise Exception("Unexpected HTTP response: %s" % resp.status)
1915 sid = resp.getheader("sid")
1916 logger.debug("Subscription SID " + sid)
1917
1918 logger.debug("Invalid re-subscription")
1919 headers = { "NT": "upnp:event",
1920 "sid": "123456734567854",
1921 "timeout": "Second-1234" }
1922 conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1923 resp = conn.getresponse()
1924 if resp.status != 400:
1925 raise Exception("Unexpected HTTP response: %s" % resp.status)
1926
1927 logger.debug("Invalid re-subscription")
1928 headers = { "NT": "upnp:event",
1929 "sid": "uuid:123456734567854",
1930 "timeout": "Second-1234" }
1931 conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1932 resp = conn.getresponse()
1933 if resp.status != 400:
1934 raise Exception("Unexpected HTTP response: %s" % resp.status)
1935
1936 logger.debug("Invalid re-subscription")
1937 headers = { "callback": '<http://127.0.0.1:12345/event>',
1938 "NT": "upnp:event",
1939 "sid": sid,
1940 "timeout": "Second-1234" }
1941 conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1942 resp = conn.getresponse()
1943 if resp.status != 400:
1944 raise Exception("Unexpected HTTP response: %s" % resp.status)
1945
1946 logger.debug("SID mismatch in re-subscription")
1947 headers = { "NT": "upnp:event",
1948 "sid": "uuid:4c2bca79-1ff4-4e43-85d4-952a2b8a51fb",
1949 "timeout": "Second-1234" }
1950 conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1951 resp = conn.getresponse()
1952 if resp.status != 412:
1953 raise Exception("Unexpected HTTP response: %s" % resp.status)
1954
1955 logger.debug("Valid re-subscription")
1956 headers = { "NT": "upnp:event",
1957 "sid": sid,
1958 "timeout": "Second-1234" }
1959 conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1960 resp = conn.getresponse()
1961 if resp.status != 200:
1962 raise Exception("Unexpected HTTP response: %s" % resp.status)
1963 sid2 = resp.getheader("sid")
1964 logger.debug("Subscription SID " + sid2)
1965
1966 if sid != sid2:
1967 raise Exception("Unexpected SID change")
1968
1969 logger.debug("Valid re-subscription")
1970 headers = { "NT": "upnp:event",
1971 "sid": "uuid: \t \t" + sid.split(':')[1],
1972 "timeout": "Second-1234" }
1973 conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1974 resp = conn.getresponse()
1975 if resp.status != 200:
1976 raise Exception("Unexpected HTTP response: %s" % resp.status)
1977
1978 logger.debug("Invalid unsubscription")
1979 headers = { "sid": sid }
1980 conn.request("UNSUBSCRIBE", "/hello", "\r\n\r\n", headers)
1981 resp = conn.getresponse()
1982 if resp.status != 412:
1983 raise Exception("Unexpected HTTP response: %s" % resp.status)
1984 headers = { "foo": "bar" }
1985 conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1986 resp = conn.getresponse()
1987 if resp.status != 412:
1988 raise Exception("Unexpected HTTP response: %s" % resp.status)
1989
1990 logger.debug("Valid unsubscription")
1991 headers = { "sid": sid }
1992 conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1993 resp = conn.getresponse()
1994 if resp.status != 200:
1995 raise Exception("Unexpected HTTP response: %s" % resp.status)
1996
1997 logger.debug("Unsubscription for not existing SID")
1998 headers = { "sid": sid }
1999 conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
2000 resp = conn.getresponse()
2001 if resp.status != 412:
2002 raise Exception("Unexpected HTTP response: %s" % resp.status)
2003
2004 logger.debug("Invalid unsubscription")
2005 headers = { "sid": " \t \tfoo" }
2006 conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
2007 resp = conn.getresponse()
2008 if resp.status != 400:
2009 raise Exception("Unexpected HTTP response: %s" % resp.status)
2010
2011 logger.debug("Invalid unsubscription")
2012 headers = { "sid": "uuid:\t \tfoo" }
2013 conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
2014 resp = conn.getresponse()
2015 if resp.status != 400:
2016 raise Exception("Unexpected HTTP response: %s" % resp.status)
2017
2018 logger.debug("Invalid unsubscription")
2019 headers = { "NT": "upnp:event",
2020 "sid": sid }
2021 conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
2022 resp = conn.getresponse()
2023 if resp.status != 400:
2024 raise Exception("Unexpected HTTP response: %s" % resp.status)
2025 headers = { "callback": '<http://127.0.0.1:12345/event>',
2026 "sid": sid }
2027 conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
2028 resp = conn.getresponse()
2029 if resp.status != 400:
2030 raise Exception("Unexpected HTTP response: %s" % resp.status)
2031
2032 logger.debug("Valid subscription with multiple callbacks")
2033 headers = { "callback": '<http://127.0.0.1:12345/event> <http://127.0.0.1:12345/event>\t<http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event>',
2034 "NT": "upnp:event",
2035 "timeout": "Second-1234" }
2036 conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
2037 resp = conn.getresponse()
2038 if resp.status != 200:
2039 raise Exception("Unexpected HTTP response: %s" % resp.status)
2040 sid = resp.getheader("sid")
2041 logger.debug("Subscription SID " + sid)