]> git.ipfire.org Git - thirdparty/hostap.git/blob - tests/hwsim/test_ap_wps.py
tests: Add option for running test cases that take a long time
[thirdparty/hostap.git] / tests / hwsim / test_ap_wps.py
1 # WPS tests
2 # Copyright (c) 2013-2014, Jouni Malinen <j@w1.fi>
3 #
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
6
7 import os
8 import time
9 import subprocess
10 import logging
11 logger = logging.getLogger()
12 import re
13 import socket
14 import httplib
15 import urlparse
16 import urllib
17 import xml.etree.ElementTree as ET
18 import StringIO
19
20 import hwsim_utils
21 import hostapd
22
23 def test_ap_wps_init(dev, apdev):
24 """Initial AP configuration with first WPS Enrollee"""
25 ssid = "test-wps"
26 hostapd.add_ap(apdev[0]['ifname'],
27 { "ssid": ssid, "eap_server": "1", "wps_state": "1" })
28 hapd = hostapd.Hostapd(apdev[0]['ifname'])
29 logger.info("WPS provisioning step")
30 hapd.request("WPS_PBC")
31 if "PBC Status: Active" not in hapd.request("WPS_GET_STATUS"):
32 raise Exception("PBC status not shown correctly")
33
34 id = dev[0].add_network()
35 dev[0].set_network_quoted(id, "ssid", "home")
36 dev[0].set_network_quoted(id, "psk", "12345678")
37 dev[0].request("ENABLE_NETWORK %s no-connect" % id)
38
39 id = dev[0].add_network()
40 dev[0].set_network_quoted(id, "ssid", "home2")
41 dev[0].set_network(id, "bssid", "00:11:22:33:44:55")
42 dev[0].set_network(id, "key_mgmt", "NONE")
43 dev[0].request("ENABLE_NETWORK %s no-connect" % id)
44
45 dev[0].request("WPS_PBC")
46 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
47 if ev is None:
48 raise Exception("Association with the AP timed out")
49 status = dev[0].get_status()
50 if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
51 raise Exception("Not fully connected")
52 if status['ssid'] != ssid:
53 raise Exception("Unexpected SSID")
54 if status['pairwise_cipher'] != 'CCMP':
55 raise Exception("Unexpected encryption configuration")
56 if status['key_mgmt'] != 'WPA2-PSK':
57 raise Exception("Unexpected key_mgmt")
58
59 status = hapd.request("WPS_GET_STATUS")
60 if "PBC Status: Disabled" not in status:
61 raise Exception("PBC status not shown correctly")
62 if "Last WPS result: Success" not in status:
63 raise Exception("Last WPS result not shown correctly")
64 if "Peer Address: " + dev[0].p2p_interface_addr() not in status:
65 raise Exception("Peer address not shown correctly")
66 conf = hapd.request("GET_CONFIG")
67 if "wps_state=configured" not in conf:
68 raise Exception("AP not in WPS configured state")
69 if "rsn_pairwise_cipher=CCMP TKIP" not in conf:
70 raise Exception("Unexpected rsn_pairwise_cipher")
71 if "wpa_pairwise_cipher=CCMP TKIP" not in conf:
72 raise Exception("Unexpected wpa_pairwise_cipher")
73 if "group_cipher=TKIP" not in conf:
74 raise Exception("Unexpected group_cipher")
75
76 if len(dev[0].list_networks()) != 3:
77 raise Exception("Unexpected number of network blocks")
78
79 def test_ap_wps_init_2ap_pbc(dev, apdev):
80 """Initial two-radio AP configuration with first WPS PBC Enrollee"""
81 ssid = "test-wps"
82 params = { "ssid": ssid, "eap_server": "1", "wps_state": "1" }
83 hostapd.add_ap(apdev[0]['ifname'], params)
84 hostapd.add_ap(apdev[1]['ifname'], params)
85 hapd = hostapd.Hostapd(apdev[0]['ifname'])
86 logger.info("WPS provisioning step")
87 hapd.request("WPS_PBC")
88 dev[0].scan(freq="2412")
89 bss = dev[0].get_bss(apdev[0]['bssid'])
90 if "[WPS-PBC]" not in bss['flags']:
91 raise Exception("WPS-PBC flag missing from AP1")
92 bss = dev[0].get_bss(apdev[1]['bssid'])
93 if "[WPS-PBC]" not in bss['flags']:
94 raise Exception("WPS-PBC flag missing from AP2")
95 dev[0].dump_monitor()
96 dev[0].request("SET wps_cred_processing 2")
97 dev[0].request("WPS_PBC")
98 ev = dev[0].wait_event(["WPS-CRED-RECEIVED"], timeout=30)
99 dev[0].request("SET wps_cred_processing 0")
100 if ev is None:
101 raise Exception("WPS cred event not seen")
102 if "100e" not in ev:
103 raise Exception("WPS attributes not included in the cred event")
104 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
105 if ev is None:
106 raise Exception("Association with the AP timed out")
107
108 dev[1].scan(freq="2412")
109 bss = dev[1].get_bss(apdev[0]['bssid'])
110 if "[WPS-PBC]" in bss['flags']:
111 raise Exception("WPS-PBC flag not cleared from AP1")
112 bss = dev[1].get_bss(apdev[1]['bssid'])
113 if "[WPS-PBC]" in bss['flags']:
114 raise Exception("WPS-PBC flag bit ckeared from AP2")
115
116 def test_ap_wps_init_2ap_pin(dev, apdev):
117 """Initial two-radio AP configuration with first WPS PIN Enrollee"""
118 ssid = "test-wps"
119 params = { "ssid": ssid, "eap_server": "1", "wps_state": "1" }
120 hostapd.add_ap(apdev[0]['ifname'], params)
121 hostapd.add_ap(apdev[1]['ifname'], params)
122 hapd = hostapd.Hostapd(apdev[0]['ifname'])
123 logger.info("WPS provisioning step")
124 pin = dev[0].wps_read_pin()
125 hapd.request("WPS_PIN any " + pin)
126 dev[0].scan(freq="2412")
127 bss = dev[0].get_bss(apdev[0]['bssid'])
128 if "[WPS-AUTH]" not in bss['flags']:
129 raise Exception("WPS-AUTH flag missing from AP1")
130 bss = dev[0].get_bss(apdev[1]['bssid'])
131 if "[WPS-AUTH]" not in bss['flags']:
132 raise Exception("WPS-AUTH flag missing from AP2")
133 dev[0].dump_monitor()
134 dev[0].request("WPS_PIN any " + pin)
135 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
136 if ev is None:
137 raise Exception("Association with the AP timed out")
138
139 dev[1].scan(freq="2412")
140 bss = dev[1].get_bss(apdev[0]['bssid'])
141 if "[WPS-AUTH]" in bss['flags']:
142 raise Exception("WPS-AUTH flag not cleared from AP1")
143 bss = dev[1].get_bss(apdev[1]['bssid'])
144 if "[WPS-AUTH]" in bss['flags']:
145 raise Exception("WPS-AUTH flag bit ckeared from AP2")
146
147 def test_ap_wps_init_through_wps_config(dev, apdev):
148 """Initial AP configuration using wps_config command"""
149 ssid = "test-wps-init-config"
150 hostapd.add_ap(apdev[0]['ifname'],
151 { "ssid": ssid, "eap_server": "1", "wps_state": "1" })
152 hapd = hostapd.Hostapd(apdev[0]['ifname'])
153 if "FAIL" in hapd.request("WPS_CONFIG " + ssid.encode("hex") + " WPA2PSK CCMP " + "12345678".encode("hex")):
154 raise Exception("WPS_CONFIG command failed")
155 ev = hapd.wait_event(["WPS-NEW-AP-SETTINGS"], timeout=5)
156 if ev is None:
157 raise Exception("Timeout on WPS-NEW-AP-SETTINGS events")
158 # It takes some time for the AP to update Beacon and Probe Response frames,
159 # so wait here before requesting the scan to be started to avoid adding
160 # extra five second wait to the test due to fetching obsolete scan results.
161 hapd.ping()
162 time.sleep(0.2)
163 dev[0].connect(ssid, psk="12345678", scan_freq="2412", proto="WPA2",
164 pairwise="CCMP", group="CCMP")
165
166 def test_ap_wps_conf(dev, apdev):
167 """WPS PBC provisioning with configured AP"""
168 ssid = "test-wps-conf"
169 hostapd.add_ap(apdev[0]['ifname'],
170 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
171 "wpa_passphrase": "12345678", "wpa": "2",
172 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
173 hapd = hostapd.Hostapd(apdev[0]['ifname'])
174 logger.info("WPS provisioning step")
175 hapd.request("WPS_PBC")
176 dev[0].dump_monitor()
177 dev[0].request("WPS_PBC")
178 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
179 if ev is None:
180 raise Exception("Association with the AP timed out")
181 status = dev[0].get_status()
182 if status['wpa_state'] != 'COMPLETED':
183 raise Exception("Not fully connected")
184 if status['bssid'] != apdev[0]['bssid']:
185 raise Exception("Unexpected BSSID")
186 if status['ssid'] != ssid:
187 raise Exception("Unexpected SSID")
188 if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'CCMP':
189 raise Exception("Unexpected encryption configuration")
190 if status['key_mgmt'] != 'WPA2-PSK':
191 raise Exception("Unexpected key_mgmt")
192
193 sta = hapd.get_sta(dev[0].p2p_interface_addr())
194 if 'wpsDeviceName' not in sta or sta['wpsDeviceName'] != "Device A":
195 raise Exception("Device name not available in STA command")
196
197 def test_ap_wps_twice(dev, apdev):
198 """WPS provisioning with twice to change passphrase"""
199 ssid = "test-wps-twice"
200 params = { "ssid": ssid, "eap_server": "1", "wps_state": "2",
201 "wpa_passphrase": "12345678", "wpa": "2",
202 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP" }
203 hostapd.add_ap(apdev[0]['ifname'], params)
204 hapd = hostapd.Hostapd(apdev[0]['ifname'])
205 logger.info("WPS provisioning step")
206 hapd.request("WPS_PBC")
207 dev[0].dump_monitor()
208 dev[0].request("WPS_PBC")
209 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
210 if ev is None:
211 raise Exception("Association with the AP timed out")
212 dev[0].request("DISCONNECT")
213
214 logger.info("Restart AP with different passphrase and re-run WPS")
215 hapd_global = hostapd.HostapdGlobal()
216 hapd_global.remove(apdev[0]['ifname'])
217 params['wpa_passphrase'] = 'another passphrase'
218 hostapd.add_ap(apdev[0]['ifname'], params)
219 hapd = hostapd.Hostapd(apdev[0]['ifname'])
220 logger.info("WPS provisioning step")
221 hapd.request("WPS_PBC")
222 dev[0].dump_monitor()
223 dev[0].request("WPS_PBC")
224 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
225 if ev is None:
226 raise Exception("Association with the AP timed out")
227 networks = dev[0].list_networks()
228 if len(networks) > 1:
229 raise Exception("Unexpected duplicated network block present")
230
231 def test_ap_wps_incorrect_pin(dev, apdev):
232 """WPS PIN provisioning with incorrect PIN"""
233 ssid = "test-wps-incorrect-pin"
234 hostapd.add_ap(apdev[0]['ifname'],
235 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
236 "wpa_passphrase": "12345678", "wpa": "2",
237 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
238 hapd = hostapd.Hostapd(apdev[0]['ifname'])
239
240 logger.info("WPS provisioning attempt 1")
241 hapd.request("WPS_PIN any 12345670")
242 dev[0].dump_monitor()
243 dev[0].request("WPS_PIN any 55554444")
244 ev = dev[0].wait_event(["WPS-FAIL"], timeout=30)
245 if ev is None:
246 raise Exception("WPS operation timed out")
247 if "config_error=18" not in ev:
248 raise Exception("Incorrect config_error reported")
249 if "msg=8" not in ev:
250 raise Exception("PIN error detected on incorrect message")
251 ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"])
252 if ev is None:
253 raise Exception("Timeout on disconnection event")
254 dev[0].request("WPS_CANCEL")
255 # if a scan was in progress, wait for it to complete before trying WPS again
256 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 5)
257
258 status = hapd.request("WPS_GET_STATUS")
259 if "Last WPS result: Failed" not in status:
260 raise Exception("WPS failure result not shown correctly")
261
262 logger.info("WPS provisioning attempt 2")
263 hapd.request("WPS_PIN any 12345670")
264 dev[0].dump_monitor()
265 dev[0].request("WPS_PIN any 12344444")
266 ev = dev[0].wait_event(["WPS-FAIL"], timeout=30)
267 if ev is None:
268 raise Exception("WPS operation timed out")
269 if "config_error=18" not in ev:
270 raise Exception("Incorrect config_error reported")
271 if "msg=10" not in ev:
272 raise Exception("PIN error detected on incorrect message")
273 ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"])
274 if ev is None:
275 raise Exception("Timeout on disconnection event")
276
277 def test_ap_wps_conf_pin(dev, apdev):
278 """WPS PIN provisioning with configured AP"""
279 ssid = "test-wps-conf-pin"
280 hostapd.add_ap(apdev[0]['ifname'],
281 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
282 "wpa_passphrase": "12345678", "wpa": "2",
283 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
284 hapd = hostapd.Hostapd(apdev[0]['ifname'])
285 logger.info("WPS provisioning step")
286 pin = dev[0].wps_read_pin()
287 hapd.request("WPS_PIN any " + pin)
288 dev[0].dump_monitor()
289 dev[0].request("WPS_PIN any " + pin)
290 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
291 if ev is None:
292 raise Exception("Association with the AP timed out")
293 status = dev[0].get_status()
294 if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
295 raise Exception("Not fully connected")
296 if status['ssid'] != ssid:
297 raise Exception("Unexpected SSID")
298 if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'CCMP':
299 raise Exception("Unexpected encryption configuration")
300 if status['key_mgmt'] != 'WPA2-PSK':
301 raise Exception("Unexpected key_mgmt")
302
303 dev[1].scan(freq="2412")
304 bss = dev[1].get_bss(apdev[0]['bssid'])
305 if "[WPS-AUTH]" in bss['flags']:
306 raise Exception("WPS-AUTH flag not cleared")
307 logger.info("Try to connect from another station using the same PIN")
308 dev[1].request("WPS_PIN any " + pin)
309 ev = dev[1].wait_event(["WPS-M2D","CTRL-EVENT-CONNECTED"], timeout=30)
310 if ev is None:
311 raise Exception("Operation timed out")
312 if "WPS-M2D" not in ev:
313 raise Exception("Unexpected WPS operation started")
314
315 def test_ap_wps_conf_pin_2sta(dev, apdev):
316 """Two stations trying to use WPS PIN at the same time"""
317 ssid = "test-wps-conf-pin2"
318 hostapd.add_ap(apdev[0]['ifname'],
319 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
320 "wpa_passphrase": "12345678", "wpa": "2",
321 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
322 hapd = hostapd.Hostapd(apdev[0]['ifname'])
323 logger.info("WPS provisioning step")
324 pin = "12345670"
325 pin2 = "55554444"
326 hapd.request("WPS_PIN " + dev[0].get_status_field("uuid") + " " + pin)
327 hapd.request("WPS_PIN " + dev[1].get_status_field("uuid") + " " + pin)
328 dev[0].dump_monitor()
329 dev[1].dump_monitor()
330 dev[0].request("WPS_PIN any " + pin)
331 dev[1].request("WPS_PIN any " + pin)
332 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
333 if ev is None:
334 raise Exception("Association with the AP timed out")
335 ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
336 if ev is None:
337 raise Exception("Association with the AP timed out")
338
339 def test_ap_wps_conf_pin_timeout(dev, apdev):
340 """WPS PIN provisioning with configured AP timing out PIN"""
341 ssid = "test-wps-conf-pin"
342 hostapd.add_ap(apdev[0]['ifname'],
343 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
344 "wpa_passphrase": "12345678", "wpa": "2",
345 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
346 hapd = hostapd.Hostapd(apdev[0]['ifname'])
347 addr = dev[0].p2p_interface_addr()
348 pin = dev[0].wps_read_pin()
349 if "FAIL" not in hapd.request("WPS_PIN "):
350 raise Exception("Unexpected success on invalid WPS_PIN")
351 hapd.request("WPS_PIN any " + pin + " 1")
352 time.sleep(1.1)
353 dev[0].request("WPS_PIN any " + pin)
354 ev = hapd.wait_event(["WPS-PIN-NEEDED"], timeout=20)
355 if ev is None:
356 raise Exception("WPS-PIN-NEEDED event timed out")
357 ev = dev[0].wait_event(["WPS-M2D"])
358 if ev is None:
359 raise Exception("M2D not reported")
360 dev[0].request("WPS_CANCEL")
361
362 hapd.request("WPS_PIN any " + pin + " 20 " + addr)
363 dev[0].request("WPS_PIN any " + pin)
364 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
365 if ev is None:
366 raise Exception("Association with the AP timed out")
367
368 def test_ap_wps_reg_connect(dev, apdev):
369 """WPS registrar using AP PIN to connect"""
370 ssid = "test-wps-reg-ap-pin"
371 appin = "12345670"
372 hostapd.add_ap(apdev[0]['ifname'],
373 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
374 "wpa_passphrase": "12345678", "wpa": "2",
375 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
376 "ap_pin": appin})
377 logger.info("WPS provisioning step")
378 dev[0].dump_monitor()
379 dev[0].wps_reg(apdev[0]['bssid'], appin)
380 status = dev[0].get_status()
381 if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
382 raise Exception("Not fully connected")
383 if status['ssid'] != ssid:
384 raise Exception("Unexpected SSID")
385 if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'CCMP':
386 raise Exception("Unexpected encryption configuration")
387 if status['key_mgmt'] != 'WPA2-PSK':
388 raise Exception("Unexpected key_mgmt")
389
390 def check_wps_reg_failure(dev, ap, appin):
391 dev.request("WPS_REG " + ap['bssid'] + " " + appin)
392 ev = dev.wait_event(["WPS-SUCCESS", "WPS-FAIL"], timeout=15)
393 if ev is None:
394 raise Exception("WPS operation timed out")
395 if "WPS-SUCCESS" in ev:
396 raise Exception("WPS operation succeeded unexpectedly")
397 if "config_error=15" not in ev:
398 raise Exception("WPS setup locked state was not reported correctly")
399
400 def test_ap_wps_random_ap_pin(dev, apdev):
401 """WPS registrar using random AP PIN"""
402 ssid = "test-wps-reg-random-ap-pin"
403 ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
404 hostapd.add_ap(apdev[0]['ifname'],
405 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
406 "wpa_passphrase": "12345678", "wpa": "2",
407 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
408 "device_name": "Wireless AP", "manufacturer": "Company",
409 "model_name": "WAP", "model_number": "123",
410 "serial_number": "12345", "device_type": "6-0050F204-1",
411 "os_version": "01020300",
412 "config_methods": "label push_button",
413 "uuid": ap_uuid, "upnp_iface": "lo" })
414 hapd = hostapd.Hostapd(apdev[0]['ifname'])
415 appin = hapd.request("WPS_AP_PIN random")
416 if "FAIL" in appin:
417 raise Exception("Could not generate random AP PIN")
418 if appin not in hapd.request("WPS_AP_PIN get"):
419 raise Exception("Could not fetch current AP PIN")
420 logger.info("WPS provisioning step")
421 dev[0].wps_reg(apdev[0]['bssid'], appin)
422
423 hapd.request("WPS_AP_PIN disable")
424 logger.info("WPS provisioning step with AP PIN disabled")
425 check_wps_reg_failure(dev[1], apdev[0], appin)
426
427 logger.info("WPS provisioning step with AP PIN reset")
428 appin = "12345670"
429 hapd.request("WPS_AP_PIN set " + appin)
430 dev[1].wps_reg(apdev[0]['bssid'], appin)
431 dev[0].request("REMOVE_NETWORK all")
432 dev[1].request("REMOVE_NETWORK all")
433 dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"])
434 dev[1].wait_event(["CTRL-EVENT-DISCONNECTED"])
435
436 logger.info("WPS provisioning step after AP PIN timeout")
437 hapd.request("WPS_AP_PIN disable")
438 appin = hapd.request("WPS_AP_PIN random 1")
439 time.sleep(1.1)
440 if "FAIL" not in hapd.request("WPS_AP_PIN get"):
441 raise Exception("AP PIN unexpectedly still enabled")
442 check_wps_reg_failure(dev[0], apdev[0], appin)
443
444 logger.info("WPS provisioning step after AP PIN timeout(2)")
445 hapd.request("WPS_AP_PIN disable")
446 appin = "12345670"
447 hapd.request("WPS_AP_PIN set " + appin + " 1")
448 time.sleep(1.1)
449 if "FAIL" not in hapd.request("WPS_AP_PIN get"):
450 raise Exception("AP PIN unexpectedly still enabled")
451 check_wps_reg_failure(dev[1], apdev[0], appin)
452
453 def test_ap_wps_reg_config(dev, apdev):
454 """WPS registrar configuring an AP using AP PIN"""
455 ssid = "test-wps-init-ap-pin"
456 appin = "12345670"
457 hostapd.add_ap(apdev[0]['ifname'],
458 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
459 "ap_pin": appin})
460 logger.info("WPS configuration step")
461 dev[0].dump_monitor()
462 new_ssid = "wps-new-ssid"
463 new_passphrase = "1234567890"
464 dev[0].wps_reg(apdev[0]['bssid'], appin, new_ssid, "WPA2PSK", "CCMP",
465 new_passphrase)
466 status = dev[0].get_status()
467 if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
468 raise Exception("Not fully connected")
469 if status['ssid'] != new_ssid:
470 raise Exception("Unexpected SSID")
471 if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'CCMP':
472 raise Exception("Unexpected encryption configuration")
473 if status['key_mgmt'] != 'WPA2-PSK':
474 raise Exception("Unexpected key_mgmt")
475
476 logger.info("Re-configure back to open")
477 dev[0].request("REMOVE_NETWORK all")
478 dev[0].request("BSS_FLUSH 0")
479 dev[0].request("SCAN freq=2412 only_new=1")
480 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
481 if ev is None:
482 raise Exception("Scan timed out")
483 dev[0].dump_monitor()
484 dev[0].wps_reg(apdev[0]['bssid'], appin, "wps-open", "OPEN", "NONE", "")
485 status = dev[0].get_status()
486 if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
487 raise Exception("Not fully connected")
488 if status['ssid'] != "wps-open":
489 raise Exception("Unexpected SSID")
490 if status['key_mgmt'] != 'NONE':
491 raise Exception("Unexpected key_mgmt")
492
493 def test_ap_wps_reg_config_ext_processing(dev, apdev):
494 """WPS registrar configuring an AP with external config processing"""
495 ssid = "test-wps-init-ap-pin"
496 appin = "12345670"
497 params = { "ssid": ssid, "eap_server": "1", "wps_state": "2",
498 "wps_cred_processing": "1", "ap_pin": appin}
499 hapd = hostapd.add_ap(apdev[0]['ifname'], params)
500 new_ssid = "wps-new-ssid"
501 new_passphrase = "1234567890"
502 dev[0].wps_reg(apdev[0]['bssid'], appin, new_ssid, "WPA2PSK", "CCMP",
503 new_passphrase, no_wait=True)
504 ev = dev[0].wait_event(["WPS-SUCCESS"], timeout=15)
505 if ev is None:
506 raise Exception("WPS registrar operation timed out")
507 ev = hapd.wait_event(["WPS-NEW-AP-SETTINGS"], timeout=15)
508 if ev is None:
509 raise Exception("WPS configuration timed out")
510 if "1026" not in ev:
511 raise Exception("AP Settings missing from event")
512 hapd.request("SET wps_cred_processing 0")
513 if "FAIL" in hapd.request("WPS_CONFIG " + new_ssid.encode("hex") + " WPA2PSK CCMP " + new_passphrase.encode("hex")):
514 raise Exception("WPS_CONFIG command failed")
515 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=15)
516 if ev is None:
517 raise Exception("Association with the AP timed out")
518
519 def test_ap_wps_reg_config_tkip(dev, apdev):
520 """WPS registrar configuring AP to use TKIP and AP upgrading to TKIP+CCMP"""
521 ssid = "test-wps-init-ap"
522 appin = "12345670"
523 hostapd.add_ap(apdev[0]['ifname'],
524 { "ssid": ssid, "eap_server": "1", "wps_state": "1",
525 "ap_pin": appin})
526 logger.info("WPS configuration step")
527 dev[0].request("SET wps_version_number 0x10")
528 dev[0].dump_monitor()
529 new_ssid = "wps-new-ssid-with-tkip"
530 new_passphrase = "1234567890"
531 dev[0].wps_reg(apdev[0]['bssid'], appin, new_ssid, "WPAPSK", "TKIP",
532 new_passphrase)
533 logger.info("Re-connect to verify WPA2 mixed mode")
534 dev[0].request("DISCONNECT")
535 id = 0
536 dev[0].set_network(id, "pairwise", "CCMP")
537 dev[0].set_network(id, "proto", "RSN")
538 dev[0].connect_network(id)
539 status = dev[0].get_status()
540 if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
541 raise Exception("Not fully connected")
542 if status['ssid'] != new_ssid:
543 raise Exception("Unexpected SSID")
544 if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'TKIP':
545 raise Exception("Unexpected encryption configuration")
546 if status['key_mgmt'] != 'WPA2-PSK':
547 raise Exception("Unexpected key_mgmt")
548
549 def test_ap_wps_setup_locked(dev, apdev):
550 """WPS registrar locking up AP setup on AP PIN failures"""
551 ssid = "test-wps-incorrect-ap-pin"
552 appin = "12345670"
553 hostapd.add_ap(apdev[0]['ifname'],
554 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
555 "wpa_passphrase": "12345678", "wpa": "2",
556 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
557 "ap_pin": appin})
558 new_ssid = "wps-new-ssid-test"
559 new_passphrase = "1234567890"
560
561 ap_setup_locked=False
562 for pin in ["55554444", "1234", "12345678", "00000000", "11111111"]:
563 dev[0].dump_monitor()
564 logger.info("Try incorrect AP PIN - attempt " + pin)
565 dev[0].wps_reg(apdev[0]['bssid'], pin, new_ssid, "WPA2PSK",
566 "CCMP", new_passphrase, no_wait=True)
567 ev = dev[0].wait_event(["WPS-FAIL", "CTRL-EVENT-CONNECTED"])
568 if ev is None:
569 raise Exception("Timeout on receiving WPS operation failure event")
570 if "CTRL-EVENT-CONNECTED" in ev:
571 raise Exception("Unexpected connection")
572 if "config_error=15" in ev:
573 logger.info("AP Setup Locked")
574 ap_setup_locked=True
575 elif "config_error=18" not in ev:
576 raise Exception("config_error=18 not reported")
577 ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"])
578 if ev is None:
579 raise Exception("Timeout on disconnection event")
580 time.sleep(0.1)
581 if not ap_setup_locked:
582 raise Exception("AP setup was not locked")
583
584 hapd = hostapd.Hostapd(apdev[0]['ifname'])
585 status = hapd.request("WPS_GET_STATUS")
586 if "Last WPS result: Failed" not in status:
587 raise Exception("WPS failure result not shown correctly")
588 if "Peer Address: " + dev[0].p2p_interface_addr() not in status:
589 raise Exception("Peer address not shown correctly")
590
591 time.sleep(0.5)
592 dev[0].dump_monitor()
593 logger.info("WPS provisioning step")
594 pin = dev[0].wps_read_pin()
595 hapd = hostapd.Hostapd(apdev[0]['ifname'])
596 hapd.request("WPS_PIN any " + pin)
597 dev[0].request("WPS_PIN any " + pin)
598 ev = dev[0].wait_event(["WPS-SUCCESS"], timeout=30)
599 if ev is None:
600 raise Exception("WPS success was not reported")
601 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
602 if ev is None:
603 raise Exception("Association with the AP timed out")
604
605 appin = hapd.request("WPS_AP_PIN random")
606 if "FAIL" in appin:
607 raise Exception("Could not generate random AP PIN")
608 ev = hapd.wait_event(["WPS-AP-SETUP-UNLOCKED"], timeout=10)
609 if ev is None:
610 raise Exception("Failed to unlock AP PIN")
611
612 def test_ap_wps_setup_locked_timeout(dev, apdev):
613 """WPS re-enabling AP PIN after timeout"""
614 ssid = "test-wps-incorrect-ap-pin"
615 appin = "12345670"
616 hostapd.add_ap(apdev[0]['ifname'],
617 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
618 "wpa_passphrase": "12345678", "wpa": "2",
619 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
620 "ap_pin": appin})
621 new_ssid = "wps-new-ssid-test"
622 new_passphrase = "1234567890"
623
624 ap_setup_locked=False
625 for pin in ["55554444", "1234", "12345678", "00000000", "11111111"]:
626 dev[0].dump_monitor()
627 logger.info("Try incorrect AP PIN - attempt " + pin)
628 dev[0].wps_reg(apdev[0]['bssid'], pin, new_ssid, "WPA2PSK",
629 "CCMP", new_passphrase, no_wait=True)
630 ev = dev[0].wait_event(["WPS-FAIL", "CTRL-EVENT-CONNECTED"])
631 if ev is None:
632 raise Exception("Timeout on receiving WPS operation failure event")
633 if "CTRL-EVENT-CONNECTED" in ev:
634 raise Exception("Unexpected connection")
635 if "config_error=15" in ev:
636 logger.info("AP Setup Locked")
637 ap_setup_locked=True
638 break
639 elif "config_error=18" not in ev:
640 raise Exception("config_error=18 not reported")
641 ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"])
642 if ev is None:
643 raise Exception("Timeout on disconnection event")
644 time.sleep(0.1)
645 if not ap_setup_locked:
646 raise Exception("AP setup was not locked")
647 hapd = hostapd.Hostapd(apdev[0]['ifname'])
648 ev = hapd.wait_event(["WPS-AP-SETUP-UNLOCKED"], timeout=80)
649 if ev is None:
650 raise Exception("AP PIN did not get unlocked on 60 second timeout")
651
652 def test_ap_wps_pbc_overlap_2ap(dev, apdev):
653 """WPS PBC session overlap with two active APs"""
654 hostapd.add_ap(apdev[0]['ifname'],
655 { "ssid": "wps1", "eap_server": "1", "wps_state": "2",
656 "wpa_passphrase": "12345678", "wpa": "2",
657 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
658 "wps_independent": "1"})
659 hostapd.add_ap(apdev[1]['ifname'],
660 { "ssid": "wps2", "eap_server": "1", "wps_state": "2",
661 "wpa_passphrase": "123456789", "wpa": "2",
662 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
663 "wps_independent": "1"})
664 hapd = hostapd.Hostapd(apdev[0]['ifname'])
665 hapd.request("WPS_PBC")
666 hapd2 = hostapd.Hostapd(apdev[1]['ifname'])
667 hapd2.request("WPS_PBC")
668 logger.info("WPS provisioning step")
669 dev[0].dump_monitor()
670 dev[0].request("WPS_PBC")
671 ev = dev[0].wait_event(["WPS-OVERLAP-DETECTED"], timeout=15)
672 if ev is None:
673 raise Exception("PBC session overlap not detected")
674
675 def test_ap_wps_pbc_overlap_2sta(dev, apdev):
676 """WPS PBC session overlap with two active STAs"""
677 ssid = "test-wps-pbc-overlap"
678 hostapd.add_ap(apdev[0]['ifname'],
679 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
680 "wpa_passphrase": "12345678", "wpa": "2",
681 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
682 hapd = hostapd.Hostapd(apdev[0]['ifname'])
683 logger.info("WPS provisioning step")
684 hapd.request("WPS_PBC")
685 dev[0].dump_monitor()
686 dev[1].dump_monitor()
687 dev[0].request("WPS_PBC")
688 dev[1].request("WPS_PBC")
689 ev = dev[0].wait_event(["WPS-M2D"], timeout=15)
690 if ev is None:
691 raise Exception("PBC session overlap not detected (dev0)")
692 if "config_error=12" not in ev:
693 raise Exception("PBC session overlap not correctly reported (dev0)")
694 ev = dev[1].wait_event(["WPS-M2D"], timeout=15)
695 if ev is None:
696 raise Exception("PBC session overlap not detected (dev1)")
697 if "config_error=12" not in ev:
698 raise Exception("PBC session overlap not correctly reported (dev1)")
699 hapd.request("WPS_CANCEL")
700 ret = hapd.request("WPS_PBC")
701 if "FAIL" not in ret:
702 raise Exception("PBC mode allowed to be started while PBC overlap still active")
703
704 def test_ap_wps_cancel(dev, apdev):
705 """WPS AP cancelling enabled config method"""
706 ssid = "test-wps-ap-cancel"
707 hostapd.add_ap(apdev[0]['ifname'],
708 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
709 "wpa_passphrase": "12345678", "wpa": "2",
710 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP" })
711 bssid = apdev[0]['bssid']
712 hapd = hostapd.Hostapd(apdev[0]['ifname'])
713
714 logger.info("Verify PBC enable/cancel")
715 hapd.request("WPS_PBC")
716 dev[0].scan(freq="2412")
717 bss = dev[0].get_bss(apdev[0]['bssid'])
718 if "[WPS-PBC]" not in bss['flags']:
719 raise Exception("WPS-PBC flag missing")
720 if "FAIL" in hapd.request("WPS_CANCEL"):
721 raise Exception("WPS_CANCEL failed")
722 dev[0].scan(freq="2412")
723 bss = dev[0].get_bss(apdev[0]['bssid'])
724 if "[WPS-PBC]" in bss['flags']:
725 raise Exception("WPS-PBC flag not cleared")
726
727 logger.info("Verify PIN enable/cancel")
728 hapd.request("WPS_PIN any 12345670")
729 dev[0].scan(freq="2412")
730 bss = dev[0].get_bss(apdev[0]['bssid'])
731 if "[WPS-AUTH]" not in bss['flags']:
732 raise Exception("WPS-AUTH flag missing")
733 if "FAIL" in hapd.request("WPS_CANCEL"):
734 raise Exception("WPS_CANCEL failed")
735 dev[0].scan(freq="2412")
736 bss = dev[0].get_bss(apdev[0]['bssid'])
737 if "[WPS-AUTH]" in bss['flags']:
738 raise Exception("WPS-AUTH flag not cleared")
739
740 def test_ap_wps_er_add_enrollee(dev, apdev):
741 """WPS ER configuring AP and adding a new enrollee using PIN"""
742 ssid = "wps-er-add-enrollee"
743 ap_pin = "12345670"
744 ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
745 hostapd.add_ap(apdev[0]['ifname'],
746 { "ssid": ssid, "eap_server": "1", "wps_state": "1",
747 "device_name": "Wireless AP", "manufacturer": "Company",
748 "model_name": "WAP", "model_number": "123",
749 "serial_number": "12345", "device_type": "6-0050F204-1",
750 "os_version": "01020300",
751 "config_methods": "label push_button",
752 "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"})
753 logger.info("WPS configuration step")
754 new_passphrase = "1234567890"
755 dev[0].dump_monitor()
756 dev[0].wps_reg(apdev[0]['bssid'], ap_pin, ssid, "WPA2PSK", "CCMP",
757 new_passphrase)
758 status = dev[0].get_status()
759 if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
760 raise Exception("Not fully connected")
761 if status['ssid'] != ssid:
762 raise Exception("Unexpected SSID")
763 if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'CCMP':
764 raise Exception("Unexpected encryption configuration")
765 if status['key_mgmt'] != 'WPA2-PSK':
766 raise Exception("Unexpected key_mgmt")
767
768 logger.info("Start ER")
769 dev[0].request("WPS_ER_START ifname=lo")
770 ev = dev[0].wait_event(["WPS-ER-AP-ADD"], timeout=15)
771 if ev is None:
772 raise Exception("AP discovery timed out")
773 if ap_uuid not in ev:
774 raise Exception("Expected AP UUID not found")
775
776 logger.info("Learn AP configuration through UPnP")
777 dev[0].dump_monitor()
778 dev[0].request("WPS_ER_LEARN " + ap_uuid + " " + ap_pin)
779 ev = dev[0].wait_event(["WPS-ER-AP-SETTINGS"], timeout=15)
780 if ev is None:
781 raise Exception("AP learn timed out")
782 if ap_uuid not in ev:
783 raise Exception("Expected AP UUID not in settings")
784 if "ssid=" + ssid not in ev:
785 raise Exception("Expected SSID not in settings")
786 if "key=" + new_passphrase not in ev:
787 raise Exception("Expected passphrase not in settings")
788
789 logger.info("Add Enrollee using ER")
790 pin = dev[1].wps_read_pin()
791 dev[0].dump_monitor()
792 dev[0].request("WPS_ER_PIN any " + pin + " " + dev[1].p2p_interface_addr())
793 dev[1].dump_monitor()
794 dev[1].request("WPS_PIN any " + pin)
795 ev = dev[1].wait_event(["WPS-SUCCESS"], timeout=30)
796 if ev is None:
797 raise Exception("Enrollee did not report success")
798 ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"], timeout=15)
799 if ev is None:
800 raise Exception("Association with the AP timed out")
801 ev = dev[0].wait_event(["WPS-SUCCESS"], timeout=15)
802 if ev is None:
803 raise Exception("WPS ER did not report success")
804 hwsim_utils.test_connectivity_sta(dev[0], dev[1])
805
806 logger.info("Add a specific Enrollee using ER")
807 pin = dev[2].wps_read_pin()
808 addr2 = dev[2].p2p_interface_addr()
809 dev[0].dump_monitor()
810 dev[2].dump_monitor()
811 dev[2].request("WPS_PIN any " + pin)
812 ev = dev[0].wait_event(["WPS-ER-ENROLLEE-ADD"], timeout=10)
813 if ev is None:
814 raise Exception("Enrollee not seen")
815 if addr2 not in ev:
816 raise Exception("Unexpected Enrollee MAC address")
817 dev[0].request("WPS_ER_PIN " + addr2 + " " + pin + " " + addr2)
818 ev = dev[2].wait_event(["CTRL-EVENT-CONNECTED"], timeout=15)
819 if ev is None:
820 raise Exception("Association with the AP timed out")
821 ev = dev[0].wait_event(["WPS-SUCCESS"], timeout=15)
822 if ev is None:
823 raise Exception("WPS ER did not report success")
824
825 logger.info("Verify registrar selection behavior")
826 dev[0].request("WPS_ER_PIN any " + pin + " " + dev[1].p2p_interface_addr())
827 dev[1].request("DISCONNECT")
828 dev[1].wait_event(["CTRL-EVENT-DISCONNECTED"])
829 dev[1].scan(freq="2412")
830 bss = dev[1].get_bss(apdev[0]['bssid'])
831 if "[WPS-AUTH]" not in bss['flags']:
832 raise Exception("WPS-AUTH flag missing")
833
834 logger.info("Stop ER")
835 dev[0].dump_monitor()
836 dev[0].request("WPS_ER_STOP")
837 ev = dev[0].wait_event(["WPS-ER-AP-REMOVE"])
838 if ev is None:
839 raise Exception("WPS ER unsubscription timed out")
840 # It takes some time for the UPnP UNSUBSCRIBE command to go through, so wait
841 # a bit before verifying that the scan results have change.
842 time.sleep(0.2)
843
844 dev[1].scan(freq="2412")
845 bss = dev[1].get_bss(apdev[0]['bssid'])
846 if "[WPS-AUTH]" in bss['flags']:
847 raise Exception("WPS-AUTH flag not removed")
848
849 def test_ap_wps_er_add_enrollee_pbc(dev, apdev):
850 """WPS ER connected to AP and adding a new enrollee using PBC"""
851 ssid = "wps-er-add-enrollee-pbc"
852 ap_pin = "12345670"
853 ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
854 hostapd.add_ap(apdev[0]['ifname'],
855 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
856 "wpa_passphrase": "12345678", "wpa": "2",
857 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
858 "device_name": "Wireless AP", "manufacturer": "Company",
859 "model_name": "WAP", "model_number": "123",
860 "serial_number": "12345", "device_type": "6-0050F204-1",
861 "os_version": "01020300",
862 "config_methods": "label push_button",
863 "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"})
864 logger.info("Learn AP configuration")
865 dev[0].dump_monitor()
866 dev[0].wps_reg(apdev[0]['bssid'], ap_pin)
867 status = dev[0].get_status()
868 if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
869 raise Exception("Not fully connected")
870
871 logger.info("Start ER")
872 dev[0].request("WPS_ER_START ifname=lo")
873 ev = dev[0].wait_event(["WPS-ER-AP-ADD"], timeout=15)
874 if ev is None:
875 raise Exception("AP discovery timed out")
876 if ap_uuid not in ev:
877 raise Exception("Expected AP UUID not found")
878
879 logger.info("Use learned network configuration on ER")
880 dev[0].request("WPS_ER_SET_CONFIG " + ap_uuid + " 0")
881
882 logger.info("Add Enrollee using ER and PBC")
883 dev[0].dump_monitor()
884 enrollee = dev[1].p2p_interface_addr()
885 dev[1].dump_monitor()
886 dev[1].request("WPS_PBC")
887
888 for i in range(0, 2):
889 ev = dev[0].wait_event(["WPS-ER-ENROLLEE-ADD"], timeout=15)
890 if ev is None:
891 raise Exception("Enrollee discovery timed out")
892 if enrollee in ev:
893 break
894 if i == 1:
895 raise Exception("Expected Enrollee not found")
896 dev[0].request("WPS_ER_PBC " + enrollee)
897
898 ev = dev[1].wait_event(["WPS-SUCCESS"], timeout=15)
899 if ev is None:
900 raise Exception("Enrollee did not report success")
901 ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"], timeout=15)
902 if ev is None:
903 raise Exception("Association with the AP timed out")
904 ev = dev[0].wait_event(["WPS-SUCCESS"], timeout=15)
905 if ev is None:
906 raise Exception("WPS ER did not report success")
907 hwsim_utils.test_connectivity_sta(dev[0], dev[1])
908
909 # verify BSSID selection of the AP instead of UUID
910 if "FAIL" in dev[0].request("WPS_ER_SET_CONFIG " + apdev[0]['bssid'] + " 0"):
911 raise Exception("Could not select AP based on BSSID")
912
913 def test_ap_wps_er_v10_add_enrollee_pin(dev, apdev):
914 """WPS v1.0 ER connected to AP and adding a new enrollee using PIN"""
915 ssid = "wps-er-add-enrollee-pbc"
916 ap_pin = "12345670"
917 ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
918 hostapd.add_ap(apdev[0]['ifname'],
919 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
920 "wpa_passphrase": "12345678", "wpa": "2",
921 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
922 "device_name": "Wireless AP", "manufacturer": "Company",
923 "model_name": "WAP", "model_number": "123",
924 "serial_number": "12345", "device_type": "6-0050F204-1",
925 "os_version": "01020300",
926 "config_methods": "label push_button",
927 "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"})
928 logger.info("Learn AP configuration")
929 dev[0].request("SET wps_version_number 0x10")
930 dev[0].dump_monitor()
931 dev[0].wps_reg(apdev[0]['bssid'], ap_pin)
932 status = dev[0].get_status()
933 if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
934 raise Exception("Not fully connected")
935
936 logger.info("Start ER")
937 dev[0].request("WPS_ER_START ifname=lo")
938 ev = dev[0].wait_event(["WPS-ER-AP-ADD"], timeout=15)
939 if ev is None:
940 raise Exception("AP discovery timed out")
941 if ap_uuid not in ev:
942 raise Exception("Expected AP UUID not found")
943
944 logger.info("Use learned network configuration on ER")
945 dev[0].request("WPS_ER_SET_CONFIG " + ap_uuid + " 0")
946
947 logger.info("Add Enrollee using ER and PIN")
948 enrollee = dev[1].p2p_interface_addr()
949 pin = dev[1].wps_read_pin()
950 dev[0].dump_monitor()
951 dev[0].request("WPS_ER_PIN any " + pin + " " + enrollee)
952 dev[1].dump_monitor()
953 dev[1].request("WPS_PIN any " + pin)
954 ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"], timeout=15)
955 if ev is None:
956 raise Exception("Association with the AP timed out")
957 ev = dev[0].wait_event(["WPS-SUCCESS"], timeout=15)
958 if ev is None:
959 raise Exception("WPS ER did not report success")
960
961 def test_ap_wps_er_config_ap(dev, apdev):
962 """WPS ER configuring AP over UPnP"""
963 ssid = "wps-er-ap-config"
964 ap_pin = "12345670"
965 ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
966 hostapd.add_ap(apdev[0]['ifname'],
967 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
968 "wpa_passphrase": "12345678", "wpa": "2",
969 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
970 "device_name": "Wireless AP", "manufacturer": "Company",
971 "model_name": "WAP", "model_number": "123",
972 "serial_number": "12345", "device_type": "6-0050F204-1",
973 "os_version": "01020300",
974 "config_methods": "label push_button",
975 "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"})
976
977 logger.info("Connect ER to the AP")
978 dev[0].connect(ssid, psk="12345678", scan_freq="2412")
979
980 logger.info("WPS configuration step")
981 dev[0].request("WPS_ER_START ifname=lo")
982 ev = dev[0].wait_event(["WPS-ER-AP-ADD"], timeout=15)
983 if ev is None:
984 raise Exception("AP discovery timed out")
985 if ap_uuid not in ev:
986 raise Exception("Expected AP UUID not found")
987 new_passphrase = "1234567890"
988 dev[0].request("WPS_ER_CONFIG " + apdev[0]['bssid'] + " " + ap_pin + " " +
989 ssid.encode("hex") + " WPA2PSK CCMP " +
990 new_passphrase.encode("hex"))
991 ev = dev[0].wait_event(["WPS-SUCCESS"])
992 if ev is None:
993 raise Exception("WPS ER configuration operation timed out")
994 dev[1].wait_event(["CTRL-EVENT-DISCONNECTED"])
995 dev[0].connect(ssid, psk="1234567890", scan_freq="2412")
996
997 def test_ap_wps_fragmentation(dev, apdev):
998 """WPS with fragmentation in EAP-WSC and mixed mode WPA+WPA2"""
999 ssid = "test-wps-fragmentation"
1000 hostapd.add_ap(apdev[0]['ifname'],
1001 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1002 "wpa_passphrase": "12345678", "wpa": "3",
1003 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
1004 "wpa_pairwise": "TKIP",
1005 "fragment_size": "50" })
1006 hapd = hostapd.Hostapd(apdev[0]['ifname'])
1007 logger.info("WPS provisioning step")
1008 hapd.request("WPS_PBC")
1009 dev[0].dump_monitor()
1010 dev[0].request("SET wps_fragment_size 50")
1011 dev[0].request("WPS_PBC")
1012 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
1013 if ev is None:
1014 raise Exception("Association with the AP timed out")
1015 status = dev[0].get_status()
1016 if status['wpa_state'] != 'COMPLETED':
1017 raise Exception("Not fully connected")
1018 if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'TKIP':
1019 raise Exception("Unexpected encryption configuration")
1020 if status['key_mgmt'] != 'WPA2-PSK':
1021 raise Exception("Unexpected key_mgmt")
1022
1023 def test_ap_wps_new_version_sta(dev, apdev):
1024 """WPS compatibility with new version number on the station"""
1025 ssid = "test-wps-ver"
1026 hostapd.add_ap(apdev[0]['ifname'],
1027 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1028 "wpa_passphrase": "12345678", "wpa": "2",
1029 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP" })
1030 hapd = hostapd.Hostapd(apdev[0]['ifname'])
1031 logger.info("WPS provisioning step")
1032 hapd.request("WPS_PBC")
1033 dev[0].dump_monitor()
1034 dev[0].request("SET wps_version_number 0x43")
1035 dev[0].request("SET wps_vendor_ext_m1 000137100100020001")
1036 dev[0].request("WPS_PBC")
1037 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
1038 if ev is None:
1039 raise Exception("Association with the AP timed out")
1040
1041 def test_ap_wps_new_version_ap(dev, apdev):
1042 """WPS compatibility with new version number on the AP"""
1043 ssid = "test-wps-ver"
1044 hostapd.add_ap(apdev[0]['ifname'],
1045 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1046 "wpa_passphrase": "12345678", "wpa": "2",
1047 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP" })
1048 hapd = hostapd.Hostapd(apdev[0]['ifname'])
1049 logger.info("WPS provisioning step")
1050 if "FAIL" in hapd.request("SET wps_version_number 0x43"):
1051 raise Exception("Failed to enable test functionality")
1052 hapd.request("WPS_PBC")
1053 dev[0].dump_monitor()
1054 dev[0].request("WPS_PBC")
1055 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
1056 hapd.request("SET wps_version_number 0x20")
1057 if ev is None:
1058 raise Exception("Association with the AP timed out")
1059
1060 def test_ap_wps_check_pin(dev, apdev):
1061 """Verify PIN checking through control interface"""
1062 hostapd.add_ap(apdev[0]['ifname'],
1063 { "ssid": "wps", "eap_server": "1", "wps_state": "2",
1064 "wpa_passphrase": "12345678", "wpa": "2",
1065 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP" })
1066 hapd = hostapd.Hostapd(apdev[0]['ifname'])
1067 for t in [ ("12345670", "12345670"),
1068 ("12345678", "FAIL-CHECKSUM"),
1069 ("1234-5670", "12345670"),
1070 ("1234 5670", "12345670"),
1071 ("1-2.3:4 5670", "12345670") ]:
1072 res = hapd.request("WPS_CHECK_PIN " + t[0]).rstrip('\n')
1073 res2 = dev[0].request("WPS_CHECK_PIN " + t[0]).rstrip('\n')
1074 if res != res2:
1075 raise Exception("Unexpected difference in WPS_CHECK_PIN responses")
1076 if res != t[1]:
1077 raise Exception("Incorrect WPS_CHECK_PIN response {} (expected {})".format(res, t[1]))
1078
1079 if "FAIL" not in hapd.request("WPS_CHECK_PIN 12345"):
1080 raise Exception("Unexpected WPS_CHECK_PIN success")
1081 if "FAIL" not in hapd.request("WPS_CHECK_PIN 123456789"):
1082 raise Exception("Unexpected WPS_CHECK_PIN success")
1083
1084 def test_ap_wps_wep_config(dev, apdev):
1085 """WPS 2.0 AP rejecting WEP configuration"""
1086 ssid = "test-wps-config"
1087 appin = "12345670"
1088 hostapd.add_ap(apdev[0]['ifname'],
1089 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1090 "ap_pin": appin})
1091 hapd = hostapd.Hostapd(apdev[0]['ifname'])
1092 dev[0].wps_reg(apdev[0]['bssid'], appin, "wps-new-ssid-wep", "OPEN", "WEP",
1093 "hello", no_wait=True)
1094 ev = hapd.wait_event(["WPS-FAIL"], timeout=15)
1095 if ev is None:
1096 raise Exception("WPS-FAIL timed out")
1097 if "reason=2" not in ev:
1098 raise Exception("Unexpected reason code in WPS-FAIL")
1099 status = hapd.request("WPS_GET_STATUS")
1100 if "Last WPS result: Failed" not in status:
1101 raise Exception("WPS failure result not shown correctly")
1102 if "Failure Reason: WEP Prohibited" not in status:
1103 raise Exception("Failure reason not reported correctly")
1104 if "Peer Address: " + dev[0].p2p_interface_addr() not in status:
1105 raise Exception("Peer address not shown correctly")
1106
1107 def test_ap_wps_wep_enroll(dev, apdev):
1108 """WPS 2.0 STA rejecting WEP configuration"""
1109 ssid = "test-wps-wep"
1110 hostapd.add_ap(apdev[0]['ifname'],
1111 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1112 "skip_cred_build": "1", "extra_cred": "wps-wep-cred" })
1113 hapd = hostapd.Hostapd(apdev[0]['ifname'])
1114 hapd.request("WPS_PBC")
1115 dev[0].request("WPS_PBC")
1116 ev = dev[0].wait_event(["WPS-FAIL"], timeout=15)
1117 if ev is None:
1118 raise Exception("WPS-FAIL event timed out")
1119 if "msg=12" not in ev or "reason=2 (WEP Prohibited)" not in ev:
1120 raise Exception("Unexpected WPS-FAIL event: " + ev)
1121
1122 def test_ap_wps_ie_fragmentation(dev, apdev):
1123 """WPS AP using fragmented WPS IE"""
1124 ssid = "test-wps-ie-fragmentation"
1125 params = { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1126 "wpa_passphrase": "12345678", "wpa": "2",
1127 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
1128 "device_name": "1234567890abcdef1234567890abcdef",
1129 "manufacturer": "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef",
1130 "model_name": "1234567890abcdef1234567890abcdef",
1131 "model_number": "1234567890abcdef1234567890abcdef",
1132 "serial_number": "1234567890abcdef1234567890abcdef" }
1133 hostapd.add_ap(apdev[0]['ifname'], params)
1134 hapd = hostapd.Hostapd(apdev[0]['ifname'])
1135 hapd.request("WPS_PBC")
1136 dev[0].request("WPS_PBC")
1137 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
1138 if ev is None:
1139 raise Exception("Association with the AP timed out")
1140 bss = dev[0].get_bss(apdev[0]['bssid'])
1141 if "wps_device_name" not in bss or bss['wps_device_name'] != "1234567890abcdef1234567890abcdef":
1142 logger.info(bss)
1143 raise Exception("Device Name not received correctly")
1144 if len(re.findall("dd..0050f204", bss['ie'])) != 2:
1145 raise Exception("Unexpected number of WPS IEs")
1146
1147 def get_psk(pskfile):
1148 psks = {}
1149 with open(pskfile, "r") as f:
1150 lines = f.read().splitlines()
1151 for l in lines:
1152 if l == "# WPA PSKs":
1153 continue
1154 (addr,psk) = l.split(' ')
1155 psks[addr] = psk
1156 return psks
1157
1158 def test_ap_wps_per_station_psk(dev, apdev):
1159 """WPS PBC provisioning with per-station PSK"""
1160 addr0 = dev[0].p2p_dev_addr()
1161 addr1 = dev[1].p2p_dev_addr()
1162 addr2 = dev[2].p2p_dev_addr()
1163 ssid = "wps"
1164 appin = "12345670"
1165 pskfile = "/tmp/ap_wps_per_enrollee_psk.psk_file"
1166 try:
1167 os.remove(pskfile)
1168 except:
1169 pass
1170
1171 try:
1172 with open(pskfile, "w") as f:
1173 f.write("# WPA PSKs\n")
1174
1175 params = { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1176 "wpa": "2", "wpa_key_mgmt": "WPA-PSK",
1177 "rsn_pairwise": "CCMP", "ap_pin": appin,
1178 "wpa_psk_file": pskfile }
1179 hapd = hostapd.add_ap(apdev[0]['ifname'], params)
1180
1181 logger.info("First enrollee")
1182 hapd.request("WPS_PBC")
1183 dev[0].request("WPS_PBC")
1184 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"])
1185 if ev is None:
1186 raise Exception("Association with the AP timed out (1)")
1187
1188 logger.info("Second enrollee")
1189 hapd.request("WPS_PBC")
1190 dev[1].request("WPS_PBC")
1191 ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"])
1192 if ev is None:
1193 raise Exception("Association with the AP timed out (2)")
1194
1195 logger.info("External registrar")
1196 dev[2].wps_reg(apdev[0]['bssid'], appin)
1197
1198 logger.info("Verifying PSK results")
1199 psks = get_psk(pskfile)
1200 if addr0 not in psks:
1201 raise Exception("No PSK recorded for sta0")
1202 if addr1 not in psks:
1203 raise Exception("No PSK recorded for sta1")
1204 if addr2 not in psks:
1205 raise Exception("No PSK recorded for sta2")
1206 if psks[addr0] == psks[addr1]:
1207 raise Exception("Same PSK recorded for sta0 and sta1")
1208 if psks[addr0] == psks[addr2]:
1209 raise Exception("Same PSK recorded for sta0 and sta2")
1210 if psks[addr1] == psks[addr2]:
1211 raise Exception("Same PSK recorded for sta1 and sta2")
1212
1213 dev[0].request("REMOVE_NETWORK all")
1214 logger.info("Second external registrar")
1215 dev[0].wps_reg(apdev[0]['bssid'], appin)
1216 psks2 = get_psk(pskfile)
1217 if addr0 not in psks2:
1218 raise Exception("No PSK recorded for sta0(reg)")
1219 if psks[addr0] == psks2[addr0]:
1220 raise Exception("Same PSK recorded for sta0(enrollee) and sta0(reg)")
1221 finally:
1222 os.remove(pskfile)
1223
1224 def test_ap_wps_per_station_psk_failure(dev, apdev):
1225 """WPS PBC provisioning with per-station PSK (file not writable)"""
1226 addr0 = dev[0].p2p_dev_addr()
1227 addr1 = dev[1].p2p_dev_addr()
1228 addr2 = dev[2].p2p_dev_addr()
1229 ssid = "wps"
1230 appin = "12345670"
1231 pskfile = "/tmp/ap_wps_per_enrollee_psk.psk_file"
1232 try:
1233 os.remove(pskfile)
1234 except:
1235 pass
1236
1237 try:
1238 with open(pskfile, "w") as f:
1239 f.write("# WPA PSKs\n")
1240
1241 params = { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1242 "wpa": "2", "wpa_key_mgmt": "WPA-PSK",
1243 "rsn_pairwise": "CCMP", "ap_pin": appin,
1244 "wpa_psk_file": pskfile }
1245 hapd = hostapd.add_ap(apdev[0]['ifname'], params)
1246 if "FAIL" in hapd.request("SET wpa_psk_file /tmp/does/not/exists/ap_wps_per_enrollee_psk_failure.psk_file"):
1247 raise Exception("Failed to set wpa_psk_file")
1248
1249 logger.info("First enrollee")
1250 hapd.request("WPS_PBC")
1251 dev[0].request("WPS_PBC")
1252 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"])
1253 if ev is None:
1254 raise Exception("Association with the AP timed out (1)")
1255
1256 logger.info("Second enrollee")
1257 hapd.request("WPS_PBC")
1258 dev[1].request("WPS_PBC")
1259 ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"])
1260 if ev is None:
1261 raise Exception("Association with the AP timed out (2)")
1262
1263 logger.info("External registrar")
1264 dev[2].wps_reg(apdev[0]['bssid'], appin)
1265
1266 logger.info("Verifying PSK results")
1267 psks = get_psk(pskfile)
1268 if len(psks) > 0:
1269 raise Exception("PSK recorded unexpectedly")
1270 finally:
1271 os.remove(pskfile)
1272
1273 def test_ap_wps_pin_request_file(dev, apdev):
1274 """WPS PIN provisioning with configured AP"""
1275 ssid = "wps"
1276 pinfile = "/tmp/ap_wps_pin_request_file.log"
1277 if os.path.exists(pinfile):
1278 subprocess.call(['sudo', 'rm', pinfile])
1279 hostapd.add_ap(apdev[0]['ifname'],
1280 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1281 "wps_pin_requests": pinfile,
1282 "wpa_passphrase": "12345678", "wpa": "2",
1283 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
1284 hapd = hostapd.Hostapd(apdev[0]['ifname'])
1285 uuid = dev[0].get_status_field("uuid")
1286 pin = dev[0].wps_read_pin()
1287 try:
1288 dev[0].request("WPS_PIN any " + pin)
1289 ev = hapd.wait_event(["WPS-PIN-NEEDED"], timeout=15)
1290 if ev is None:
1291 raise Exception("PIN needed event not shown")
1292 if uuid not in ev:
1293 raise Exception("UUID mismatch")
1294 dev[0].request("WPS_CANCEL")
1295 success = False
1296 with open(pinfile, "r") as f:
1297 lines = f.readlines()
1298 for l in lines:
1299 if uuid in l:
1300 success = True
1301 break
1302 if not success:
1303 raise Exception("PIN request entry not in the log file")
1304 finally:
1305 subprocess.call(['sudo', 'rm', pinfile])
1306
1307 def test_ap_wps_auto_setup_with_config_file(dev, apdev):
1308 """WPS auto-setup with configuration file"""
1309 conffile = "/tmp/ap_wps_auto_setup_with_config_file.conf"
1310 ifname = apdev[0]['ifname']
1311 try:
1312 with open(conffile, "w") as f:
1313 f.write("driver=nl80211\n")
1314 f.write("hw_mode=g\n")
1315 f.write("channel=1\n")
1316 f.write("ieee80211n=1\n")
1317 f.write("interface=%s\n" % ifname)
1318 f.write("ctrl_interface=/var/run/hostapd\n")
1319 f.write("ssid=wps\n")
1320 f.write("eap_server=1\n")
1321 f.write("wps_state=1\n")
1322 hostapd.add_bss('phy3', ifname, conffile)
1323 hapd = hostapd.Hostapd(ifname)
1324 hapd.request("WPS_PBC")
1325 dev[0].request("WPS_PBC")
1326 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
1327 if ev is None:
1328 raise Exception("Association with the AP timed out")
1329 with open(conffile, "r") as f:
1330 lines = f.read().splitlines()
1331 vals = dict()
1332 for l in lines:
1333 try:
1334 [name,value] = l.split('=', 1)
1335 vals[name] = value
1336 except ValueError, e:
1337 if "# WPS configuration" in l:
1338 pass
1339 else:
1340 raise Exception("Unexpected configuration line: " + l)
1341 if vals['ieee80211n'] != '1' or vals['wps_state'] != '2' or "WPA-PSK" not in vals['wpa_key_mgmt']:
1342 raise Exception("Incorrect configuration: " + str(vals))
1343 finally:
1344 subprocess.call(['sudo', 'rm', conffile])
1345
1346 def test_ap_wps_pbc_timeout(dev, apdev, params):
1347 """wpa_supplicant PBC walk time [long]"""
1348 if not params['long']:
1349 logger.info("Skip test case with long duration due to --long not specified")
1350 return "skip"
1351 ssid = "test-wps"
1352 hostapd.add_ap(apdev[0]['ifname'],
1353 { "ssid": ssid, "eap_server": "1", "wps_state": "1" })
1354 hapd = hostapd.Hostapd(apdev[0]['ifname'])
1355 logger.info("Start WPS_PBC and wait for PBC walk time expiration")
1356 if "OK" not in dev[0].request("WPS_PBC"):
1357 raise Exception("WPS_PBC failed")
1358 ev = dev[0].wait_event(["WPS-TIMEOUT"], timeout=150)
1359 if ev is None:
1360 raise Exception("WPS-TIMEOUT not reported")
1361
1362 def add_ssdp_ap(ifname, ap_uuid):
1363 ssid = "wps-ssdp"
1364 ap_pin = "12345670"
1365 hostapd.add_ap(ifname,
1366 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1367 "wpa_passphrase": "12345678", "wpa": "2",
1368 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
1369 "device_name": "Wireless AP", "manufacturer": "Company",
1370 "model_name": "WAP", "model_number": "123",
1371 "serial_number": "12345", "device_type": "6-0050F204-1",
1372 "os_version": "01020300",
1373 "config_methods": "label push_button",
1374 "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo",
1375 "friendly_name": "WPS Access Point",
1376 "manufacturer_url": "http://www.example.com/",
1377 "model_description": "Wireless Access Point",
1378 "model_url": "http://www.example.com/model/",
1379 "upc": "123456789012" })
1380
1381 def ssdp_send(msg, no_recv=False):
1382 socket.setdefaulttimeout(1)
1383 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
1384 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1385 sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
1386 sock.bind(("127.0.0.1", 0))
1387 sock.sendto(msg, ("239.255.255.250", 1900))
1388 if no_recv:
1389 return None
1390 return sock.recv(1000)
1391
1392 def ssdp_send_msearch(st):
1393 msg = '\r\n'.join([
1394 'M-SEARCH * HTTP/1.1',
1395 'HOST: 239.255.255.250:1900',
1396 'MX: 1',
1397 'MAN: "ssdp:discover"',
1398 'ST: ' + st,
1399 '', ''])
1400 return ssdp_send(msg)
1401
1402 def test_ap_wps_ssdp_msearch(dev, apdev):
1403 """WPS AP and SSDP M-SEARCH messages"""
1404 ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
1405 add_ssdp_ap(apdev[0]['ifname'], ap_uuid)
1406
1407 msg = '\r\n'.join([
1408 'M-SEARCH * HTTP/1.1',
1409 'Host: 239.255.255.250:1900',
1410 'Mx: 1',
1411 'Man: "ssdp:discover"',
1412 'St: urn:schemas-wifialliance-org:device:WFADevice:1',
1413 '', ''])
1414 ssdp_send(msg)
1415
1416 msg = '\r\n'.join([
1417 'M-SEARCH * HTTP/1.1',
1418 'host:\t239.255.255.250:1900\t\t\t\t \t\t',
1419 'mx: \t1\t\t ',
1420 'man: \t \t "ssdp:discover" ',
1421 'st: urn:schemas-wifialliance-org:device:WFADevice:1\t\t',
1422 '', ''])
1423 ssdp_send(msg)
1424
1425 ssdp_send_msearch("ssdp:all")
1426 ssdp_send_msearch("upnp:rootdevice")
1427 ssdp_send_msearch("uuid:" + ap_uuid)
1428 ssdp_send_msearch("urn:schemas-wifialliance-org:service:WFAWLANConfig:1")
1429 ssdp_send_msearch("urn:schemas-wifialliance-org:device:WFADevice:1");
1430
1431 msg = '\r\n'.join([
1432 'M-SEARCH * HTTP/1.1',
1433 'HOST:\t239.255.255.250:1900',
1434 'MAN: "ssdp:discover"',
1435 'MX: 130',
1436 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1437 '', ''])
1438 ssdp_send(msg, no_recv=True)
1439
1440 def test_ap_wps_ssdp_invalid_msearch(dev, apdev):
1441 """WPS AP and invalid SSDP M-SEARCH messages"""
1442 ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
1443 add_ssdp_ap(apdev[0]['ifname'], ap_uuid)
1444
1445 socket.setdefaulttimeout(1)
1446 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
1447 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1448 sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
1449 sock.bind(("127.0.0.1", 0))
1450
1451 logger.debug("Missing MX")
1452 msg = '\r\n'.join([
1453 'M-SEARCH * HTTP/1.1',
1454 'HOST: 239.255.255.250:1900',
1455 'MAN: "ssdp:discover"',
1456 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1457 '', ''])
1458 sock.sendto(msg, ("239.255.255.250", 1900))
1459
1460 logger.debug("Negative MX")
1461 msg = '\r\n'.join([
1462 'M-SEARCH * HTTP/1.1',
1463 'HOST: 239.255.255.250:1900',
1464 'MX: -1',
1465 'MAN: "ssdp:discover"',
1466 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1467 '', ''])
1468 sock.sendto(msg, ("239.255.255.250", 1900))
1469
1470 logger.debug("Invalid MX")
1471 msg = '\r\n'.join([
1472 'M-SEARCH * HTTP/1.1',
1473 'HOST: 239.255.255.250:1900',
1474 'MX; 1',
1475 'MAN: "ssdp:discover"',
1476 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1477 '', ''])
1478 sock.sendto(msg, ("239.255.255.250", 1900))
1479
1480 logger.debug("Missing MAN")
1481 msg = '\r\n'.join([
1482 'M-SEARCH * HTTP/1.1',
1483 'HOST: 239.255.255.250:1900',
1484 'MX: 1',
1485 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1486 '', ''])
1487 sock.sendto(msg, ("239.255.255.250", 1900))
1488
1489 logger.debug("Invalid MAN")
1490 msg = '\r\n'.join([
1491 'M-SEARCH * HTTP/1.1',
1492 'HOST: 239.255.255.250:1900',
1493 'MX: 1',
1494 'MAN: foo',
1495 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1496 '', ''])
1497 sock.sendto(msg, ("239.255.255.250", 1900))
1498 msg = '\r\n'.join([
1499 'M-SEARCH * HTTP/1.1',
1500 'HOST: 239.255.255.250:1900',
1501 'MX: 1',
1502 'MAN; "ssdp:discover"',
1503 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1504 '', ''])
1505 sock.sendto(msg, ("239.255.255.250", 1900))
1506
1507 logger.debug("Missing HOST")
1508 msg = '\r\n'.join([
1509 'M-SEARCH * HTTP/1.1',
1510 'MAN: "ssdp:discover"',
1511 'MX: 1',
1512 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1513 '', ''])
1514 sock.sendto(msg, ("239.255.255.250", 1900))
1515
1516 logger.debug("Missing ST")
1517 msg = '\r\n'.join([
1518 'M-SEARCH * HTTP/1.1',
1519 'HOST: 239.255.255.250:1900',
1520 'MAN: "ssdp:discover"',
1521 'MX: 1',
1522 '', ''])
1523 sock.sendto(msg, ("239.255.255.250", 1900))
1524
1525 logger.debug("Mismatching ST")
1526 msg = '\r\n'.join([
1527 'M-SEARCH * HTTP/1.1',
1528 'HOST: 239.255.255.250:1900',
1529 'MAN: "ssdp:discover"',
1530 'MX: 1',
1531 'ST: uuid:16d5f8a9-4ee4-4f5e-81f9-cc6e2f47f42d',
1532 '', ''])
1533 sock.sendto(msg, ("239.255.255.250", 1900))
1534 msg = '\r\n'.join([
1535 'M-SEARCH * HTTP/1.1',
1536 'HOST: 239.255.255.250:1900',
1537 'MAN: "ssdp:discover"',
1538 'MX: 1',
1539 'ST: foo:bar',
1540 '', ''])
1541 sock.sendto(msg, ("239.255.255.250", 1900))
1542 msg = '\r\n'.join([
1543 'M-SEARCH * HTTP/1.1',
1544 'HOST: 239.255.255.250:1900',
1545 'MAN: "ssdp:discover"',
1546 'MX: 1',
1547 'ST: foobar',
1548 '', ''])
1549 sock.sendto(msg, ("239.255.255.250", 1900))
1550
1551 logger.debug("Invalid ST")
1552 msg = '\r\n'.join([
1553 'M-SEARCH * HTTP/1.1',
1554 'HOST: 239.255.255.250:1900',
1555 'MAN: "ssdp:discover"',
1556 'MX: 1',
1557 'ST; urn:schemas-wifialliance-org:device:WFADevice:1',
1558 '', ''])
1559 sock.sendto(msg, ("239.255.255.250", 1900))
1560
1561 logger.debug("Invalid M-SEARCH")
1562 msg = '\r\n'.join([
1563 'M+SEARCH * HTTP/1.1',
1564 'HOST: 239.255.255.250:1900',
1565 'MAN: "ssdp:discover"',
1566 'MX: 1',
1567 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1568 '', ''])
1569 sock.sendto(msg, ("239.255.255.250", 1900))
1570 msg = '\r\n'.join([
1571 'M-SEARCH-* HTTP/1.1',
1572 'HOST: 239.255.255.250:1900',
1573 'MAN: "ssdp:discover"',
1574 'MX: 1',
1575 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1576 '', ''])
1577 sock.sendto(msg, ("239.255.255.250", 1900))
1578
1579 logger.debug("Invalid message format")
1580 sock.sendto("NOTIFY * HTTP/1.1", ("239.255.255.250", 1900))
1581 msg = '\r'.join([
1582 'M-SEARCH * HTTP/1.1',
1583 'HOST: 239.255.255.250:1900',
1584 'MAN: "ssdp:discover"',
1585 'MX: 1',
1586 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1587 '', ''])
1588 sock.sendto(msg, ("239.255.255.250", 1900))
1589
1590 try:
1591 r = sock.recv(1000)
1592 raise Exception("Unexpected M-SEARCH response: " + r)
1593 except socket.timeout:
1594 pass
1595
1596 logger.debug("Valid M-SEARCH")
1597 msg = '\r\n'.join([
1598 'M-SEARCH * HTTP/1.1',
1599 'HOST: 239.255.255.250:1900',
1600 'MAN: "ssdp:discover"',
1601 'MX: 1',
1602 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1603 '', ''])
1604 sock.sendto(msg, ("239.255.255.250", 1900))
1605
1606 try:
1607 r = sock.recv(1000)
1608 pass
1609 except socket.timeout:
1610 raise Exception("No SSDP response")
1611
1612 def test_ap_wps_ssdp_burst(dev, apdev):
1613 """WPS AP and SSDP burst"""
1614 ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
1615 add_ssdp_ap(apdev[0]['ifname'], ap_uuid)
1616
1617 msg = '\r\n'.join([
1618 'M-SEARCH * HTTP/1.1',
1619 'HOST: 239.255.255.250:1900',
1620 'MAN: "ssdp:discover"',
1621 'MX: 1',
1622 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1623 '', ''])
1624 socket.setdefaulttimeout(1)
1625 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
1626 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1627 sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
1628 sock.bind(("127.0.0.1", 0))
1629 for i in range(0, 25):
1630 sock.sendto(msg, ("239.255.255.250", 1900))
1631 resp = 0
1632 while True:
1633 try:
1634 r = sock.recv(1000)
1635 if not r.startswith("HTTP/1.1 200 OK\r\n"):
1636 raise Exception("Unexpected message: " + r)
1637 resp += 1
1638 except socket.timeout:
1639 break
1640 if resp < 20:
1641 raise Exception("Too few SSDP responses")
1642
1643 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
1644 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1645 sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
1646 sock.bind(("127.0.0.1", 0))
1647 for i in range(0, 25):
1648 sock.sendto(msg, ("239.255.255.250", 1900))
1649 while True:
1650 try:
1651 r = sock.recv(1000)
1652 if ap_uuid in r:
1653 break
1654 except socket.timeout:
1655 raise Exception("No SSDP response")
1656
1657 def ssdp_get_location(uuid):
1658 res = ssdp_send_msearch("uuid:" + uuid)
1659 location = None
1660 for l in res.splitlines():
1661 if l.lower().startswith("location:"):
1662 location = l.split(':', 1)[1].strip()
1663 break
1664 if location is None:
1665 raise Exception("No UPnP location found")
1666 return location
1667
1668 def upnp_get_urls(location):
1669 conn = urllib.urlopen(location)
1670 tree = ET.parse(conn)
1671 root = tree.getroot()
1672 urn = '{urn:schemas-upnp-org:device-1-0}'
1673 service = root.find("./" + urn + "device/" + urn + "serviceList/" + urn + "service")
1674 res = {}
1675 res['scpd_url'] = urlparse.urljoin(location, service.find(urn + 'SCPDURL').text)
1676 res['control_url'] = urlparse.urljoin(location, service.find(urn + 'controlURL').text)
1677 res['event_sub_url'] = urlparse.urljoin(location, service.find(urn + 'eventSubURL').text)
1678 return res
1679
1680 def upnp_soap_action(conn, path, action, include_soap_action=True, soap_action_override=None):
1681 soapns = 'http://schemas.xmlsoap.org/soap/envelope/'
1682 wpsns = 'urn:schemas-wifialliance-org:service:WFAWLANConfig:1'
1683 ET.register_namespace('soapenv', soapns)
1684 ET.register_namespace('wfa', wpsns)
1685 attrib = {}
1686 attrib['{%s}encodingStyle' % soapns] = 'http://schemas.xmlsoap.org/soap/encoding/'
1687 root = ET.Element("{%s}Envelope" % soapns, attrib=attrib)
1688 body = ET.SubElement(root, "{%s}Body" % soapns)
1689 act = ET.SubElement(body, "{%s}%s" % (wpsns, action))
1690 tree = ET.ElementTree(root)
1691 soap = StringIO.StringIO()
1692 tree.write(soap, xml_declaration=True, encoding='utf-8')
1693
1694 headers = { "Content-type": 'text/xml; charset="utf-8"' }
1695 if include_soap_action:
1696 headers["SOAPAction"] = '"urn:schemas-wifialliance-org:service:WFAWLANConfig:1#%s"' % action
1697 elif soap_action_override:
1698 headers["SOAPAction"] = soap_action_override
1699 conn.request("POST", path, soap.getvalue(), headers)
1700 return conn.getresponse()
1701
1702 def test_ap_wps_upnp(dev, apdev):
1703 """WPS AP and UPnP operations"""
1704 ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
1705 add_ssdp_ap(apdev[0]['ifname'], ap_uuid)
1706
1707 location = ssdp_get_location(ap_uuid)
1708 urls = upnp_get_urls(location)
1709
1710 conn = urllib.urlopen(urls['scpd_url'])
1711 scpd = conn.read()
1712
1713 conn = urllib.urlopen(urlparse.urljoin(location, "unknown.html"))
1714 if conn.getcode() != 404:
1715 raise Exception("Unexpected HTTP response to GET unknown URL")
1716
1717 url = urlparse.urlparse(location)
1718 conn = httplib.HTTPConnection(url.netloc)
1719 #conn.set_debuglevel(1)
1720 headers = { "Content-type": 'text/xml; charset="utf-8"',
1721 "SOAPAction": '"urn:schemas-wifialliance-org:service:WFAWLANConfig:1#GetDeviceInfo"' }
1722 conn.request("POST", "hello", "\r\n\r\n", headers)
1723 resp = conn.getresponse()
1724 if resp.status != 404:
1725 raise Exception("Unexpected HTTP response: %s" % resp.status)
1726
1727 conn.request("UNKNOWN", "hello", "\r\n\r\n", headers)
1728 resp = conn.getresponse()
1729 if resp.status != 501:
1730 raise Exception("Unexpected HTTP response: %s" % resp.status)
1731
1732 headers = { "Content-type": 'text/xml; charset="utf-8"',
1733 "SOAPAction": '"urn:some-unknown-action#GetDeviceInfo"' }
1734 ctrlurl = urlparse.urlparse(urls['control_url'])
1735 conn.request("POST", ctrlurl.path, "\r\n\r\n", headers)
1736 resp = conn.getresponse()
1737 if resp.status != 401:
1738 raise Exception("Unexpected HTTP response: %s" % resp.status)
1739
1740 logger.debug("GetDeviceInfo without SOAPAction header")
1741 resp = upnp_soap_action(conn, ctrlurl.path, "GetDeviceInfo",
1742 include_soap_action=False)
1743 if resp.status != 401:
1744 raise Exception("Unexpected HTTP response: %s" % resp.status)
1745
1746 logger.debug("GetDeviceInfo with invalid SOAPAction header")
1747 for act in [ "foo",
1748 "urn:schemas-wifialliance-org:service:WFAWLANConfig:1#GetDeviceInfo",
1749 '"urn:schemas-wifialliance-org:service:WFAWLANConfig:1"',
1750 '"urn:schemas-wifialliance-org:service:WFAWLANConfig:123#GetDevice']:
1751 resp = upnp_soap_action(conn, ctrlurl.path, "GetDeviceInfo",
1752 include_soap_action=False,
1753 soap_action_override=act)
1754 if resp.status != 401:
1755 raise Exception("Unexpected HTTP response: %s" % resp.status)
1756
1757 resp = upnp_soap_action(conn, ctrlurl.path, "GetDeviceInfo")
1758 if resp.status != 200:
1759 raise Exception("Unexpected HTTP response: %s" % resp.status)
1760 dev = resp.read()
1761 if "NewDeviceInfo" not in dev:
1762 raise Exception("Unexpected GetDeviceInfo response")
1763
1764 logger.debug("PutMessage without required parameters")
1765 resp = upnp_soap_action(conn, ctrlurl.path, "PutMessage")
1766 if resp.status != 600:
1767 raise Exception("Unexpected HTTP response: %s" % resp.status)
1768
1769 logger.debug("PutWLANResponse without required parameters")
1770 resp = upnp_soap_action(conn, ctrlurl.path, "PutWLANResponse")
1771 if resp.status != 600:
1772 raise Exception("Unexpected HTTP response: %s" % resp.status)
1773
1774 logger.debug("SetSelectedRegistrar from unregistered ER")
1775 resp = upnp_soap_action(conn, ctrlurl.path, "SetSelectedRegistrar")
1776 if resp.status != 501:
1777 raise Exception("Unexpected HTTP response: %s" % resp.status)
1778
1779 logger.debug("Unknown action")
1780 resp = upnp_soap_action(conn, ctrlurl.path, "Unknown")
1781 if resp.status != 401:
1782 raise Exception("Unexpected HTTP response: %s" % resp.status)
1783
1784 def test_ap_wps_upnp_subscribe(dev, apdev):
1785 """WPS AP and UPnP event subscription"""
1786 ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
1787 add_ssdp_ap(apdev[0]['ifname'], ap_uuid)
1788
1789 location = ssdp_get_location(ap_uuid)
1790 urls = upnp_get_urls(location)
1791 eventurl = urlparse.urlparse(urls['event_sub_url'])
1792
1793 url = urlparse.urlparse(location)
1794 conn = httplib.HTTPConnection(url.netloc)
1795 #conn.set_debuglevel(1)
1796 headers = { "callback": '<http://127.0.0.1:12345/event>',
1797 "timeout": "Second-1234" }
1798 conn.request("SUBSCRIBE", "hello", "\r\n\r\n", headers)
1799 resp = conn.getresponse()
1800 if resp.status != 412:
1801 raise Exception("Unexpected HTTP response: %s" % resp.status)
1802
1803 conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1804 resp = conn.getresponse()
1805 if resp.status != 412:
1806 raise Exception("Unexpected HTTP response: %s" % resp.status)
1807
1808 headers = { "NT": "upnp:event",
1809 "timeout": "Second-1234" }
1810 conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1811 resp = conn.getresponse()
1812 if resp.status != 412:
1813 raise Exception("Unexpected HTTP response: %s" % resp.status)
1814
1815 headers = { "callback": '<http://127.0.0.1:12345/event>',
1816 "NT": "upnp:foobar",
1817 "timeout": "Second-1234" }
1818 conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1819 resp = conn.getresponse()
1820 if resp.status != 400:
1821 raise Exception("Unexpected HTTP response: %s" % resp.status)
1822
1823 logger.debug("Valid subscription")
1824 headers = { "callback": '<http://127.0.0.1:12345/event>',
1825 "NT": "upnp:event",
1826 "timeout": "Second-1234" }
1827 conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1828 resp = conn.getresponse()
1829 if resp.status != 200:
1830 raise Exception("Unexpected HTTP response: %s" % resp.status)
1831 sid = resp.getheader("sid")
1832 logger.debug("Subscription SID " + sid)
1833
1834 logger.debug("Invalid re-subscription")
1835 headers = { "NT": "upnp:event",
1836 "sid": "123456734567854",
1837 "timeout": "Second-1234" }
1838 conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1839 resp = conn.getresponse()
1840 if resp.status != 400:
1841 raise Exception("Unexpected HTTP response: %s" % resp.status)
1842
1843 logger.debug("Invalid re-subscription")
1844 headers = { "NT": "upnp:event",
1845 "sid": "uuid:123456734567854",
1846 "timeout": "Second-1234" }
1847 conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1848 resp = conn.getresponse()
1849 if resp.status != 400:
1850 raise Exception("Unexpected HTTP response: %s" % resp.status)
1851
1852 logger.debug("Invalid re-subscription")
1853 headers = { "callback": '<http://127.0.0.1:12345/event>',
1854 "NT": "upnp:event",
1855 "sid": sid,
1856 "timeout": "Second-1234" }
1857 conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1858 resp = conn.getresponse()
1859 if resp.status != 400:
1860 raise Exception("Unexpected HTTP response: %s" % resp.status)
1861
1862 logger.debug("SID mismatch in re-subscription")
1863 headers = { "NT": "upnp:event",
1864 "sid": "uuid:4c2bca79-1ff4-4e43-85d4-952a2b8a51fb",
1865 "timeout": "Second-1234" }
1866 conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1867 resp = conn.getresponse()
1868 if resp.status != 412:
1869 raise Exception("Unexpected HTTP response: %s" % resp.status)
1870
1871 logger.debug("Valid re-subscription")
1872 headers = { "NT": "upnp:event",
1873 "sid": sid,
1874 "timeout": "Second-1234" }
1875 conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1876 resp = conn.getresponse()
1877 if resp.status != 200:
1878 raise Exception("Unexpected HTTP response: %s" % resp.status)
1879 sid2 = resp.getheader("sid")
1880 logger.debug("Subscription SID " + sid2)
1881
1882 if sid != sid2:
1883 raise Exception("Unexpected SID change")
1884
1885 logger.debug("Valid re-subscription")
1886 headers = { "NT": "upnp:event",
1887 "sid": "uuid: \t \t" + sid.split(':')[1],
1888 "timeout": "Second-1234" }
1889 conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1890 resp = conn.getresponse()
1891 if resp.status != 200:
1892 raise Exception("Unexpected HTTP response: %s" % resp.status)
1893
1894 logger.debug("Invalid unsubscription")
1895 headers = { "sid": sid }
1896 conn.request("UNSUBSCRIBE", "/hello", "\r\n\r\n", headers)
1897 resp = conn.getresponse()
1898 if resp.status != 412:
1899 raise Exception("Unexpected HTTP response: %s" % resp.status)
1900 headers = { "foo": "bar" }
1901 conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1902 resp = conn.getresponse()
1903 if resp.status != 412:
1904 raise Exception("Unexpected HTTP response: %s" % resp.status)
1905
1906 logger.debug("Valid unsubscription")
1907 headers = { "sid": sid }
1908 conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1909 resp = conn.getresponse()
1910 if resp.status != 200:
1911 raise Exception("Unexpected HTTP response: %s" % resp.status)
1912
1913 logger.debug("Unsubscription for not existing SID")
1914 headers = { "sid": sid }
1915 conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1916 resp = conn.getresponse()
1917 if resp.status != 412:
1918 raise Exception("Unexpected HTTP response: %s" % resp.status)
1919
1920 logger.debug("Invalid unsubscription")
1921 headers = { "sid": " \t \tfoo" }
1922 conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1923 resp = conn.getresponse()
1924 if resp.status != 400:
1925 raise Exception("Unexpected HTTP response: %s" % resp.status)
1926
1927 logger.debug("Invalid unsubscription")
1928 headers = { "sid": "uuid:\t \tfoo" }
1929 conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1930 resp = conn.getresponse()
1931 if resp.status != 400:
1932 raise Exception("Unexpected HTTP response: %s" % resp.status)
1933
1934 logger.debug("Invalid unsubscription")
1935 headers = { "NT": "upnp:event",
1936 "sid": sid }
1937 conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1938 resp = conn.getresponse()
1939 if resp.status != 400:
1940 raise Exception("Unexpected HTTP response: %s" % resp.status)
1941 headers = { "callback": '<http://127.0.0.1:12345/event>',
1942 "sid": sid }
1943 conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1944 resp = conn.getresponse()
1945 if resp.status != 400:
1946 raise Exception("Unexpected HTTP response: %s" % resp.status)
1947
1948 logger.debug("Valid subscription with multiple callbacks")
1949 headers = { "callback": '<http://127.0.0.1:12345/event> <http://127.0.0.1:12345/event>\t<http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event>',
1950 "NT": "upnp:event",
1951 "timeout": "Second-1234" }
1952 conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1953 resp = conn.getresponse()
1954 if resp.status != 200:
1955 raise Exception("Unexpected HTTP response: %s" % resp.status)
1956 sid = resp.getheader("sid")
1957 logger.debug("Subscription SID " + sid)