X-Git-Url: http://git.ipfire.org/?a=blobdiff_plain;f=tests%2Fhwsim%2Ftest_ap_wps.py;h=33d6ffb994239dbcdad273e1983d29189a6405a5;hb=97d2d7ac1a45cddd0bd34de224050d614737ffbd;hp=32d2e634a2a5b7982a66c27569a7f1ae2afecb70;hpb=bc6e32880f194845d874709bfa12556352bed370;p=thirdparty%2Fhostap.git diff --git a/tests/hwsim/test_ap_wps.py b/tests/hwsim/test_ap_wps.py index 32d2e634a..33d6ffb99 100644 --- a/tests/hwsim/test_ap_wps.py +++ b/tests/hwsim/test_ap_wps.py @@ -1,10 +1,11 @@ # WPS tests -# Copyright (c) 2013-2015, Jouni Malinen +# Copyright (c) 2013-2017, Jouni Malinen # # This software may be distributed under the terms of the BSD license. # See README for more details. from remotehost import remote_compatible +from tshark import run_tshark import base64 import binascii from Crypto.Cipher import AES @@ -12,6 +13,7 @@ import hashlib import hmac import os import time +import sys import stat import subprocess import logging @@ -19,24 +21,34 @@ logger = logging.getLogger() import re import socket import struct -import httplib -import urlparse +try: + from http.client import HTTPConnection + from urllib.request import urlopen + from urllib.parse import urlparse, urljoin + from urllib.error import HTTPError + from io import StringIO + from socketserver import StreamRequestHandler, TCPServer +except ImportError: + from httplib import HTTPConnection + from urllib import urlopen + from urlparse import urlparse, urljoin + from urllib2 import build_opener, ProxyHandler, HTTPError + from StringIO import StringIO + from SocketServer import StreamRequestHandler, TCPServer import urllib import xml.etree.ElementTree as ET -import StringIO -import SocketServer import hwsim_utils import hostapd from wpasupplicant import WpaSupplicant from utils import HwsimSkip, alloc_fail, fail_test, skip_with_fips -from utils import wait_fail_trigger +from utils import wait_fail_trigger, clear_regdom from test_ap_eap import int_eap_server_params def wps_start_ap(apdev, ssid="test-wps-conf"): - params = { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "12345678", "wpa": "2", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP" } + params = {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"} return hostapd.add_ap(apdev, params) @remote_compatible @@ -44,7 +56,7 @@ def test_ap_wps_init(dev, apdev): """Initial AP configuration with first WPS Enrollee""" ssid = "test-wps" hapd = hostapd.add_ap(apdev[0], - { "ssid": ssid, "eap_server": "1", "wps_state": "1" }) + {"ssid": ssid, "eap_server": "1", "wps_state": "1"}) logger.info("WPS provisioning step") hapd.request("WPS_PBC") if "PBC Status: Active" not in hapd.request("WPS_GET_STATUS"): @@ -98,7 +110,7 @@ def test_ap_wps_init(dev, apdev): def test_ap_wps_init_2ap_pbc(dev, apdev): """Initial two-radio AP configuration with first WPS PBC Enrollee""" ssid = "test-wps" - params = { "ssid": ssid, "eap_server": "1", "wps_state": "1" } + params = {"ssid": ssid, "eap_server": "1", "wps_state": "1"} hapd = hostapd.add_ap(apdev[0], params) hostapd.add_ap(apdev[1], params) logger.info("WPS provisioning step") @@ -134,7 +146,7 @@ def test_ap_wps_init_2ap_pbc(dev, apdev): def test_ap_wps_init_2ap_pin(dev, apdev): """Initial two-radio AP configuration with first WPS PIN Enrollee""" ssid = "test-wps" - params = { "ssid": ssid, "eap_server": "1", "wps_state": "1" } + params = {"ssid": ssid, "eap_server": "1", "wps_state": "1"} hapd = hostapd.add_ap(apdev[0], params) hostapd.add_ap(apdev[1], params) logger.info("WPS provisioning step") @@ -166,8 +178,8 @@ def test_ap_wps_init_through_wps_config(dev, apdev): """Initial AP configuration using wps_config command""" ssid = "test-wps-init-config" hapd = hostapd.add_ap(apdev[0], - { "ssid": ssid, "eap_server": "1", "wps_state": "1" }) - if "FAIL" in hapd.request("WPS_CONFIG " + ssid.encode("hex") + " WPA2PSK CCMP " + "12345678".encode("hex")): + {"ssid": ssid, "eap_server": "1", "wps_state": "1"}) + if "FAIL" in hapd.request("WPS_CONFIG " + binascii.hexlify(ssid.encode()).decode() + " WPA2PSK CCMP " + binascii.hexlify(b"12345678").decode()): raise Exception("WPS_CONFIG command failed") ev = hapd.wait_event(["WPS-NEW-AP-SETTINGS"], timeout=5) if ev is None: @@ -180,14 +192,17 @@ def test_ap_wps_init_through_wps_config(dev, apdev): dev[0].connect(ssid, psk="12345678", scan_freq="2412", proto="WPA2", pairwise="CCMP", group="CCMP") + if "FAIL" not in hapd.request("WPS_CONFIG foo"): + raise Exception("Invalid WPS_CONFIG accepted") + @remote_compatible def test_ap_wps_init_through_wps_config_2(dev, apdev): """AP configuration using wps_config and wps_cred_processing=2""" ssid = "test-wps-init-config" hapd = hostapd.add_ap(apdev[0], - { "ssid": ssid, "eap_server": "1", "wps_state": "1", - "wps_cred_processing": "2" }) - if "FAIL" in hapd.request("WPS_CONFIG " + ssid.encode("hex") + " WPA2PSK CCMP " + "12345678".encode("hex")): + {"ssid": ssid, "eap_server": "1", "wps_state": "1", + "wps_cred_processing": "2"}) + if "FAIL" in hapd.request("WPS_CONFIG " + binascii.hexlify(ssid.encode()).decode() + " WPA2PSK CCMP " + binascii.hexlify(b"12345678").decode()): raise Exception("WPS_CONFIG command failed") ev = hapd.wait_event(["WPS-NEW-AP-SETTINGS"], timeout=5) if ev is None: @@ -200,17 +215,17 @@ def test_ap_wps_invalid_wps_config_passphrase(dev, apdev): """AP configuration using wps_config command with invalid passphrase""" ssid = "test-wps-init-config" hapd = hostapd.add_ap(apdev[0], - { "ssid": ssid, "eap_server": "1", "wps_state": "1" }) - if "FAIL" not in hapd.request("WPS_CONFIG " + ssid.encode("hex") + " WPA2PSK CCMP " + "1234567".encode("hex")): + {"ssid": ssid, "eap_server": "1", "wps_state": "1"}) + if "FAIL" not in hapd.request("WPS_CONFIG " + binascii.hexlify(ssid.encode()).decode() + " WPA2PSK CCMP " + binascii.hexlify(b"1234567").decode()): raise Exception("Invalid WPS_CONFIG command accepted") def test_ap_wps_conf(dev, apdev): """WPS PBC provisioning with configured AP""" ssid = "test-wps-conf" hapd = hostapd.add_ap(apdev[0], - { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "12345678", "wpa": "2", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"}) + {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"}) logger.info("WPS provisioning step") hapd.request("WPS_PBC") dev[0].scan_for_bss(apdev[0]['bssid'], freq="2412") @@ -238,10 +253,10 @@ def test_ap_wps_conf_5ghz(dev, apdev): try: hapd = None ssid = "test-wps-conf" - params = { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "12345678", "wpa": "2", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", - "country_code": "FI", "hw_mode": "a", "channel": "36" } + params = {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", + "country_code": "FI", "hw_mode": "a", "channel": "36"} hapd = hostapd.add_ap(apdev[0], params) logger.info("WPS provisioning step") hapd.request("WPS_PBC") @@ -254,20 +269,17 @@ def test_ap_wps_conf_5ghz(dev, apdev): raise Exception("Device name not available in STA command") finally: dev[0].request("DISCONNECT") - if hapd: - hapd.request("DISABLE") - subprocess.call(['iw', 'reg', 'set', '00']) - dev[0].flush_scan_cache() + clear_regdom(hapd, dev) def test_ap_wps_conf_chan14(dev, apdev): """WPS PBC provisioning with configured AP on channel 14""" try: hapd = None ssid = "test-wps-conf" - params = { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "12345678", "wpa": "2", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", - "country_code": "JP", "hw_mode": "b", "channel": "14" } + params = {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", + "country_code": "JP", "hw_mode": "b", "channel": "14"} hapd = hostapd.add_ap(apdev[0], params) logger.info("WPS provisioning step") hapd.request("WPS_PBC") @@ -279,18 +291,15 @@ def test_ap_wps_conf_chan14(dev, apdev): raise Exception("Device name not available in STA command") finally: dev[0].request("DISCONNECT") - if hapd: - hapd.request("DISABLE") - subprocess.call(['iw', 'reg', 'set', '00']) - dev[0].flush_scan_cache() + clear_regdom(hapd, dev) @remote_compatible def test_ap_wps_twice(dev, apdev): """WPS provisioning with twice to change passphrase""" ssid = "test-wps-twice" - params = { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "12345678", "wpa": "2", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP" } + params = {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"} hapd = hostapd.add_ap(apdev[0], params) logger.info("WPS provisioning step") hapd.request("WPS_PBC") @@ -318,9 +327,9 @@ def test_ap_wps_incorrect_pin(dev, apdev): """WPS PIN provisioning with incorrect PIN""" ssid = "test-wps-incorrect-pin" hapd = hostapd.add_ap(apdev[0], - { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "12345678", "wpa": "2", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"}) + {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"}) logger.info("WPS provisioning attempt 1") hapd.request("WPS_PIN any 12345670") @@ -361,9 +370,9 @@ def test_ap_wps_conf_pin(dev, apdev): """WPS PIN provisioning with configured AP""" ssid = "test-wps-conf-pin" hapd = hostapd.add_ap(apdev[0], - { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "12345678", "wpa": "2", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"}) + {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"}) logger.info("WPS provisioning step") pin = dev[0].wps_read_pin() hapd.request("WPS_PIN any " + pin) @@ -387,7 +396,7 @@ def test_ap_wps_conf_pin(dev, apdev): raise Exception("WPS-AUTH flag not cleared") logger.info("Try to connect from another station using the same PIN") pin = dev[1].request("WPS_PIN " + apdev[0]['bssid']) - ev = dev[1].wait_event(["WPS-M2D","CTRL-EVENT-CONNECTED"], timeout=30) + ev = dev[1].wait_event(["WPS-M2D", "CTRL-EVENT-CONNECTED"], timeout=30) if ev is None: raise Exception("Operation timed out") if "WPS-M2D" not in ev: @@ -399,10 +408,10 @@ def test_ap_wps_conf_pin_mixed_mode(dev, apdev): """WPS PIN provisioning with configured AP (WPA+WPA2)""" ssid = "test-wps-conf-pin-mixed" hapd = hostapd.add_ap(apdev[0], - { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "12345678", "wpa": "3", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", - "wpa_pairwise": "TKIP" }) + {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "3", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", + "wpa_pairwise": "TKIP"}) logger.info("WPS provisioning step") pin = dev[0].wps_read_pin() @@ -457,9 +466,9 @@ def test_ap_wps_conf_pin_v1(dev, apdev): """WPS PIN provisioning with configured WPS v1.0 AP""" ssid = "test-wps-conf-pin-v1" hapd = hostapd.add_ap(apdev[0], - { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "12345678", "wpa": "2", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"}) + {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"}) logger.info("WPS provisioning step") pin = dev[0].wps_read_pin() hapd.request("SET wps_version_number 0x10") @@ -483,9 +492,9 @@ def test_ap_wps_conf_pin_2sta(dev, apdev): """Two stations trying to use WPS PIN at the same time""" ssid = "test-wps-conf-pin2" hapd = hostapd.add_ap(apdev[0], - { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "12345678", "wpa": "2", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"}) + {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"}) logger.info("WPS provisioning step") pin = "12345670" pin2 = "55554444" @@ -505,9 +514,9 @@ def test_ap_wps_conf_pin_timeout(dev, apdev): """WPS PIN provisioning with configured AP timing out PIN""" ssid = "test-wps-conf-pin" hapd = hostapd.add_ap(apdev[0], - { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "12345678", "wpa": "2", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"}) + {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"}) addr = dev[0].p2p_interface_addr() pin = dev[0].wps_read_pin() if "FAIL" not in hapd.request("WPS_PIN "): @@ -533,10 +542,10 @@ def test_ap_wps_reg_connect(dev, apdev): ssid = "test-wps-reg-ap-pin" appin = "12345670" hostapd.add_ap(apdev[0], - { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "12345678", "wpa": "2", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", - "ap_pin": appin}) + {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", + "ap_pin": appin}) logger.info("WPS provisioning step") dev[0].dump_monitor() dev[0].scan_for_bss(apdev[0]['bssid'], freq=2412) @@ -551,15 +560,34 @@ def test_ap_wps_reg_connect(dev, apdev): if status['key_mgmt'] != 'WPA2-PSK': raise Exception("Unexpected key_mgmt") +def test_ap_wps_reg_connect_zero_len_ap_pin(dev, apdev): + """hostapd with zero length ap_pin parameter""" + ssid = "test-wps-reg-ap-pin" + appin = "" + hostapd.add_ap(apdev[0], + {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", + "ap_pin": appin}) + logger.info("WPS provisioning step") + dev[0].dump_monitor() + dev[0].scan_for_bss(apdev[0]['bssid'], freq=2412) + dev[0].wps_reg(apdev[0]['bssid'], appin, no_wait=True) + ev = dev[0].wait_event(["WPS-FAIL"], timeout=15) + if ev is None: + raise Exception("No WPS-FAIL reported") + if "msg=5 config_error=15" not in ev: + raise Exception("Unexpected WPS-FAIL: " + ev) + def test_ap_wps_reg_connect_mixed_mode(dev, apdev): """WPS registrar using AP PIN to connect (WPA+WPA2)""" ssid = "test-wps-reg-ap-pin" appin = "12345670" hostapd.add_ap(apdev[0], - { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "12345678", "wpa": "3", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", - "wpa_pairwise": "TKIP", "ap_pin": appin}) + {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "3", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", + "wpa_pairwise": "TKIP", "ap_pin": appin}) dev[0].scan_for_bss(apdev[0]['bssid'], freq=2412) dev[0].wps_reg(apdev[0]['bssid'], appin) status = dev[0].get_status() @@ -580,22 +608,22 @@ def test_ap_wps_reg_override_ap_settings(dev, apdev): except: pass # Override AP Settings with values that point to another AP - data = build_wsc_attr(ATTR_NETWORK_INDEX, '\x01') - data += build_wsc_attr(ATTR_SSID, "test") - data += build_wsc_attr(ATTR_AUTH_TYPE, '\x00\x01') - data += build_wsc_attr(ATTR_ENCR_TYPE, '\x00\x01') - data += build_wsc_attr(ATTR_NETWORK_KEY, '') + data = build_wsc_attr(ATTR_NETWORK_INDEX, b'\x01') + data += build_wsc_attr(ATTR_SSID, b"test") + data += build_wsc_attr(ATTR_AUTH_TYPE, b'\x00\x01') + data += build_wsc_attr(ATTR_ENCR_TYPE, b'\x00\x01') + data += build_wsc_attr(ATTR_NETWORK_KEY, b'') data += build_wsc_attr(ATTR_MAC_ADDR, binascii.unhexlify(apdev[1]['bssid'].replace(':', ''))) - with open(ap_settings, "w") as f: + with open(ap_settings, "wb") as f: f.write(data) ssid = "test-wps-reg-ap-pin" appin = "12345670" hostapd.add_ap(apdev[0], - { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "12345678", "wpa": "2", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", - "ap_pin": appin, "ap_settings": ap_settings }) - hapd2 = hostapd.add_ap(apdev[1], { "ssid": "test" }) + {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", + "ap_pin": appin, "ap_settings": ap_settings}) + hapd2 = hostapd.add_ap(apdev[1], {"ssid": "test"}) dev[0].scan_for_bss(apdev[0]['bssid'], freq=2412) dev[0].scan_for_bss(apdev[1]['bssid'], freq=2412) dev[0].wps_reg(apdev[0]['bssid'], appin) @@ -618,15 +646,15 @@ def test_ap_wps_random_ap_pin(dev, apdev): """WPS registrar using random AP PIN""" ssid = "test-wps-reg-random-ap-pin" ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e" - params = { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "12345678", "wpa": "2", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", - "device_name": "Wireless AP", "manufacturer": "Company", - "model_name": "WAP", "model_number": "123", - "serial_number": "12345", "device_type": "6-0050F204-1", - "os_version": "01020300", - "config_methods": "label push_button", - "uuid": ap_uuid, "upnp_iface": "lo" } + params = {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", + "device_name": "Wireless AP", "manufacturer": "Company", + "model_name": "WAP", "model_number": "123", + "serial_number": "12345", "device_type": "6-0050F204-1", + "os_version": "01020300", + "config_methods": "label push_button", + "uuid": ap_uuid, "upnp_iface": "lo"} hapd = hostapd.add_ap(apdev[0], params) appin = hapd.request("WPS_AP_PIN random") if "FAIL" in appin: @@ -676,13 +704,18 @@ def test_ap_wps_random_ap_pin(dev, apdev): hapd.request("WPS_AP_PIN set 12345670") hapd.request("WPS_AP_PIN disable") + if "FAIL" not in hapd.request("WPS_AP_PIN set"): + raise Exception("Invalid WPS_AP_PIN accepted") + if "FAIL" not in hapd.request("WPS_AP_PIN foo"): + raise Exception("Invalid WPS_AP_PIN accepted") + def test_ap_wps_reg_config(dev, apdev): """WPS registrar configuring an AP using AP PIN""" ssid = "test-wps-init-ap-pin" appin = "12345670" hostapd.add_ap(apdev[0], - { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "ap_pin": appin}) + {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "ap_pin": appin}) logger.info("WPS configuration step") dev[0].scan_for_bss(apdev[0]['bssid'], freq=2412) dev[0].dump_monitor() @@ -717,8 +750,8 @@ def test_ap_wps_reg_config_ext_processing(dev, apdev): """WPS registrar configuring an AP with external config processing""" ssid = "test-wps-init-ap-pin" appin = "12345670" - params = { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "wps_cred_processing": "1", "ap_pin": appin} + params = {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wps_cred_processing": "1", "ap_pin": appin} hapd = hostapd.add_ap(apdev[0], params) dev[0].scan_for_bss(apdev[0]['bssid'], freq=2412) new_ssid = "wps-new-ssid" @@ -734,7 +767,7 @@ def test_ap_wps_reg_config_ext_processing(dev, apdev): if "1026" not in ev: raise Exception("AP Settings missing from event") hapd.request("SET wps_cred_processing 0") - if "FAIL" in hapd.request("WPS_CONFIG " + new_ssid.encode("hex") + " WPA2PSK CCMP " + new_passphrase.encode("hex")): + if "FAIL" in hapd.request("WPS_CONFIG " + binascii.hexlify(new_ssid.encode()).decode() + " WPA2PSK CCMP " + binascii.hexlify(new_passphrase.encode()).decode()): raise Exception("WPS_CONFIG command failed") dev[0].wait_connected(timeout=15) @@ -744,8 +777,8 @@ def test_ap_wps_reg_config_tkip(dev, apdev): ssid = "test-wps-init-ap" appin = "12345670" hostapd.add_ap(apdev[0], - { "ssid": ssid, "eap_server": "1", "wps_state": "1", - "ap_pin": appin}) + {"ssid": ssid, "eap_server": "1", "wps_state": "1", + "ap_pin": appin}) logger.info("WPS configuration step") dev[0].request("SET wps_version_number 0x10") dev[0].scan_for_bss(apdev[0]['bssid'], freq=2412) @@ -775,15 +808,15 @@ def test_ap_wps_setup_locked(dev, apdev): ssid = "test-wps-incorrect-ap-pin" appin = "12345670" hapd = hostapd.add_ap(apdev[0], - { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "12345678", "wpa": "2", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", - "ap_pin": appin}) + {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", + "ap_pin": appin}) new_ssid = "wps-new-ssid-test" new_passphrase = "1234567890" dev[0].scan_for_bss(apdev[0]['bssid'], freq=2412) - ap_setup_locked=False + ap_setup_locked = False for pin in ["55554444", "1234", "12345678", "00000000", "11111111"]: dev[0].dump_monitor() logger.info("Try incorrect AP PIN - attempt " + pin) @@ -796,7 +829,7 @@ def test_ap_wps_setup_locked(dev, apdev): raise Exception("Unexpected connection") if "config_error=15" in ev: logger.info("AP Setup Locked") - ap_setup_locked=True + ap_setup_locked = True elif "config_error=18" not in ev: raise Exception("config_error=18 not reported") dev[0].wait_disconnected(timeout=10) @@ -840,15 +873,15 @@ def test_ap_wps_setup_locked_timeout(dev, apdev): ssid = "test-wps-incorrect-ap-pin" appin = "12345670" hapd = hostapd.add_ap(apdev[0], - { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "12345678", "wpa": "2", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", - "ap_pin": appin}) + {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", + "ap_pin": appin}) new_ssid = "wps-new-ssid-test" new_passphrase = "1234567890" dev[0].scan_for_bss(apdev[0]['bssid'], freq=2412) - ap_setup_locked=False + ap_setup_locked = False for pin in ["55554444", "1234", "12345678", "00000000", "11111111"]: dev[0].dump_monitor() logger.info("Try incorrect AP PIN - attempt " + pin) @@ -861,7 +894,7 @@ def test_ap_wps_setup_locked_timeout(dev, apdev): raise Exception("Unexpected connection") if "config_error=15" in ev: logger.info("AP Setup Locked") - ap_setup_locked=True + ap_setup_locked = True break elif "config_error=18" not in ev: raise Exception("config_error=18 not reported") @@ -877,10 +910,10 @@ def test_ap_wps_setup_locked_2(dev, apdev): """WPS AP configured for special ap_setup_locked=2 mode""" ssid = "test-wps-ap-pin" appin = "12345670" - params = { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "12345678", "wpa": "2", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", - "ap_pin": appin, "ap_setup_locked": "2" } + params = {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", + "ap_pin": appin, "ap_setup_locked": "2"} hapd = hostapd.add_ap(apdev[0], params) new_ssid = "wps-new-ssid-test" new_passphrase = "1234567890" @@ -914,15 +947,15 @@ def test_ap_wps_setup_locked_2(dev, apdev): @remote_compatible def test_ap_wps_pbc_overlap_2ap(dev, apdev): """WPS PBC session overlap with two active APs""" - params = { "ssid": "wps1", "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "12345678", "wpa": "2", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", - "wps_independent": "1"} + params = {"ssid": "wps1", "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", + "wps_independent": "1"} hapd = hostapd.add_ap(apdev[0], params) - params = { "ssid": "wps2", "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "123456789", "wpa": "2", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", - "wps_independent": "1"} + params = {"ssid": "wps2", "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "123456789", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", + "wps_independent": "1"} hapd2 = hostapd.add_ap(apdev[1], params) hapd.request("WPS_PBC") hapd2.request("WPS_PBC") @@ -942,9 +975,9 @@ def test_ap_wps_pbc_overlap_2sta(dev, apdev): """WPS PBC session overlap with two active STAs""" ssid = "test-wps-pbc-overlap" hapd = hostapd.add_ap(apdev[0], - { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "12345678", "wpa": "2", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"}) + {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"}) logger.info("WPS provisioning step") hapd.request("WPS_PBC") dev[0].scan_for_bss(apdev[0]['bssid'], freq="2412") @@ -980,9 +1013,9 @@ def test_ap_wps_cancel(dev, apdev): """WPS AP cancelling enabled config method""" ssid = "test-wps-ap-cancel" hapd = hostapd.add_ap(apdev[0], - { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "12345678", "wpa": "2", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP" }) + {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"}) bssid = apdev[0]['bssid'] logger.info("Verify PBC enable/cancel") @@ -1027,14 +1060,14 @@ def _test_ap_wps_er_add_enrollee(dev, apdev): ap_pin = "12345670" ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e" hostapd.add_ap(apdev[0], - { "ssid": ssid, "eap_server": "1", "wps_state": "1", - "device_name": "Wireless AP", "manufacturer": "Company", - "model_name": "WAP", "model_number": "123", - "serial_number": "12345", "device_type": "6-0050F204-1", - "os_version": "01020300", - 'friendly_name': "WPS AP - <>&'\" - TEST", - "config_methods": "label push_button", - "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"}) + {"ssid": ssid, "eap_server": "1", "wps_state": "1", + "device_name": "Wireless AP", "manufacturer": "Company", + "model_name": "WAP", "model_number": "123", + "serial_number": "12345", "device_type": "6-0050F204-1", + "os_version": "01020300", + 'friendly_name': "WPS AP - <>&'\" - TEST", + "config_methods": "label push_button", + "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"}) logger.info("WPS configuration step") new_passphrase = "1234567890" dev[0].dump_monitor() @@ -1160,15 +1193,15 @@ def _test_ap_wps_er_add_enrollee_uuid(dev, apdev): ap_pin = "12345670" ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e" hostapd.add_ap(apdev[0], - { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "12345678", "wpa": "2", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", - "device_name": "Wireless AP", "manufacturer": "Company", - "model_name": "WAP", "model_number": "123", - "serial_number": "12345", "device_type": "6-0050F204-1", - "os_version": "01020300", - "config_methods": "label push_button", - "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"}) + {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", + "device_name": "Wireless AP", "manufacturer": "Company", + "model_name": "WAP", "model_number": "123", + "serial_number": "12345", "device_type": "6-0050F204-1", + "os_version": "01020300", + "config_methods": "label push_button", + "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"}) logger.info("WPS configuration step") dev[0].scan_for_bss(apdev[0]['bssid'], freq=2412) dev[0].wps_reg(apdev[0]['bssid'], ap_pin) @@ -1252,16 +1285,16 @@ def _test_ap_wps_er_multi_add_enrollee(dev, apdev): ap_pin = "12345670" ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e" hostapd.add_ap(apdev[0], - { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "12345678", "wpa": "2", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", - "device_name": "Wireless AP", "manufacturer": "Company", - "model_name": "WAP", "model_number": "123", - "serial_number": "12345", "device_type": "6-0050F204-1", - "os_version": "01020300", - 'friendly_name': "WPS AP", - "config_methods": "label push_button", - "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"}) + {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", + "device_name": "Wireless AP", "manufacturer": "Company", + "model_name": "WAP", "model_number": "123", + "serial_number": "12345", "device_type": "6-0050F204-1", + "os_version": "01020300", + 'friendly_name': "WPS AP", + "config_methods": "label push_button", + "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"}) for i in range(2): dev[i].scan_for_bss(apdev[0]['bssid'], freq=2412) @@ -1312,15 +1345,15 @@ def _test_ap_wps_er_add_enrollee_pbc(dev, apdev): ap_pin = "12345670" ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e" hostapd.add_ap(apdev[0], - { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "12345678", "wpa": "2", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", - "device_name": "Wireless AP", "manufacturer": "Company", - "model_name": "WAP", "model_number": "123", - "serial_number": "12345", "device_type": "6-0050F204-1", - "os_version": "01020300", - "config_methods": "label push_button", - "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"}) + {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", + "device_name": "Wireless AP", "manufacturer": "Company", + "model_name": "WAP", "model_number": "123", + "serial_number": "12345", "device_type": "6-0050F204-1", + "os_version": "01020300", + "config_methods": "label push_button", + "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"}) logger.info("Learn AP configuration") dev[0].scan_for_bss(apdev[0]['bssid'], freq=2412) dev[0].dump_monitor() @@ -1383,15 +1416,15 @@ def _test_ap_wps_er_pbc_overlap(dev, apdev): ap_pin = "12345670" ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e" hostapd.add_ap(apdev[0], - { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "12345678", "wpa": "2", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", - "device_name": "Wireless AP", "manufacturer": "Company", - "model_name": "WAP", "model_number": "123", - "serial_number": "12345", "device_type": "6-0050F204-1", - "os_version": "01020300", - "config_methods": "label push_button", - "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"}) + {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", + "device_name": "Wireless AP", "manufacturer": "Company", + "model_name": "WAP", "model_number": "123", + "serial_number": "12345", "device_type": "6-0050F204-1", + "os_version": "01020300", + "config_methods": "label push_button", + "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"}) dev[0].scan_for_bss(apdev[0]['bssid'], freq=2412) dev[0].dump_monitor() dev[0].wps_reg(apdev[0]['bssid'], ap_pin) @@ -1458,15 +1491,15 @@ def _test_ap_wps_er_v10_add_enrollee_pin(dev, apdev): ap_pin = "12345670" ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e" hostapd.add_ap(apdev[0], - { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "12345678", "wpa": "2", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", - "device_name": "Wireless AP", "manufacturer": "Company", - "model_name": "WAP", "model_number": "123", - "serial_number": "12345", "device_type": "6-0050F204-1", - "os_version": "01020300", - "config_methods": "label push_button", - "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"}) + {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", + "device_name": "Wireless AP", "manufacturer": "Company", + "model_name": "WAP", "model_number": "123", + "serial_number": "12345", "device_type": "6-0050F204-1", + "os_version": "01020300", + "config_methods": "label push_button", + "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"}) logger.info("Learn AP configuration") dev[0].request("SET wps_version_number 0x10") dev[0].scan_for_bss(apdev[0]['bssid'], freq=2412) @@ -1513,15 +1546,15 @@ def _test_ap_wps_er_config_ap(dev, apdev): ap_pin = "12345670" ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e" hostapd.add_ap(apdev[0], - { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "12345678", "wpa": "2", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", - "device_name": "Wireless AP", "manufacturer": "Company", - "model_name": "WAP", "model_number": "123", - "serial_number": "12345", "device_type": "6-0050F204-1", - "os_version": "01020300", - "config_methods": "label push_button", - "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"}) + {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", + "device_name": "Wireless AP", "manufacturer": "Company", + "model_name": "WAP", "model_number": "123", + "serial_number": "12345", "device_type": "6-0050F204-1", + "os_version": "01020300", + "config_methods": "label push_button", + "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"}) logger.info("Connect ER to the AP") dev[0].connect(ssid, psk="12345678", scan_freq="2412") @@ -1535,8 +1568,8 @@ def _test_ap_wps_er_config_ap(dev, apdev): raise Exception("Expected AP UUID not found") new_passphrase = "1234567890" dev[0].request("WPS_ER_CONFIG " + apdev[0]['bssid'] + " " + ap_pin + " " + - ssid.encode("hex") + " WPA2PSK CCMP " + - new_passphrase.encode("hex")) + binascii.hexlify(ssid.encode()).decode() + " WPA2PSK CCMP " + + binascii.hexlify(new_passphrase.encode()).decode()) ev = dev[0].wait_event(["WPS-SUCCESS"]) if ev is None: raise Exception("WPS ER configuration operation timed out") @@ -1567,15 +1600,15 @@ def _test_ap_wps_er_cache_ap_settings(dev, apdev): ssid = "wps-er-add-enrollee" ap_pin = "12345670" ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e" - params = { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "12345678", "wpa": "2", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", - "device_name": "Wireless AP", "manufacturer": "Company", - "model_name": "WAP", "model_number": "123", - "serial_number": "12345", "device_type": "6-0050F204-1", - "os_version": "01020300", - "config_methods": "label push_button", - "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo" } + params = {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", + "device_name": "Wireless AP", "manufacturer": "Company", + "model_name": "WAP", "model_number": "123", + "serial_number": "12345", "device_type": "6-0050F204-1", + "os_version": "01020300", + "config_methods": "label push_button", + "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"} hapd = hostapd.add_ap(apdev[0], params) dev[0].scan_for_bss(apdev[0]['bssid'], freq=2412) dev[0].wps_reg(apdev[0]['bssid'], ap_pin) @@ -1602,15 +1635,14 @@ def _test_ap_wps_er_cache_ap_settings(dev, apdev): hapd.disable() for i in range(2): - ev = dev[0].wait_event([ "WPS-ER-AP-REMOVE", - "CTRL-EVENT-DISCONNECTED" ], + ev = dev[0].wait_event(["WPS-ER-AP-REMOVE", "CTRL-EVENT-DISCONNECTED"], timeout=15) if ev is None: raise Exception("AP removal or disconnection timed out") hapd = hostapd.add_ap(apdev[0], params) for i in range(2): - ev = dev[0].wait_event([ "WPS-ER-AP-ADD", "CTRL-EVENT-CONNECTED" ], + ev = dev[0].wait_event(["WPS-ER-AP-ADD", "CTRL-EVENT-CONNECTED"], timeout=15) if ev is None: raise Exception("AP discovery or connection timed out") @@ -1646,15 +1678,15 @@ def _test_ap_wps_er_cache_ap_settings_oom(dev, apdev): ssid = "wps-er-add-enrollee" ap_pin = "12345670" ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e" - params = { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "12345678", "wpa": "2", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", - "device_name": "Wireless AP", "manufacturer": "Company", - "model_name": "WAP", "model_number": "123", - "serial_number": "12345", "device_type": "6-0050F204-1", - "os_version": "01020300", - "config_methods": "label push_button", - "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo" } + params = {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", + "device_name": "Wireless AP", "manufacturer": "Company", + "model_name": "WAP", "model_number": "123", + "serial_number": "12345", "device_type": "6-0050F204-1", + "os_version": "01020300", + "config_methods": "label push_button", + "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"} hapd = hostapd.add_ap(apdev[0], params) dev[0].scan_for_bss(apdev[0]['bssid'], freq=2412) dev[0].wps_reg(apdev[0]['bssid'], ap_pin) @@ -1682,15 +1714,15 @@ def _test_ap_wps_er_cache_ap_settings_oom(dev, apdev): hapd.disable() for i in range(2): - ev = dev[0].wait_event([ "WPS-ER-AP-REMOVE", - "CTRL-EVENT-DISCONNECTED" ], + ev = dev[0].wait_event(["WPS-ER-AP-REMOVE", + "CTRL-EVENT-DISCONNECTED"], timeout=15) if ev is None: raise Exception("AP removal or disconnection timed out") hapd = hostapd.add_ap(apdev[0], params) for i in range(2): - ev = dev[0].wait_event([ "WPS-ER-AP-ADD", "CTRL-EVENT-CONNECTED" ], + ev = dev[0].wait_event(["WPS-ER-AP-ADD", "CTRL-EVENT-CONNECTED"], timeout=15) if ev is None: raise Exception("AP discovery or connection timed out") @@ -1708,15 +1740,15 @@ def _test_ap_wps_er_cache_ap_settings_oom2(dev, apdev): ssid = "wps-er-add-enrollee" ap_pin = "12345670" ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e" - params = { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "12345678", "wpa": "2", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", - "device_name": "Wireless AP", "manufacturer": "Company", - "model_name": "WAP", "model_number": "123", - "serial_number": "12345", "device_type": "6-0050F204-1", - "os_version": "01020300", - "config_methods": "label push_button", - "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo" } + params = {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", + "device_name": "Wireless AP", "manufacturer": "Company", + "model_name": "WAP", "model_number": "123", + "serial_number": "12345", "device_type": "6-0050F204-1", + "os_version": "01020300", + "config_methods": "label push_button", + "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"} hapd = hostapd.add_ap(apdev[0], params) dev[0].scan_for_bss(apdev[0]['bssid'], freq=2412) dev[0].wps_reg(apdev[0]['bssid'], ap_pin) @@ -1744,15 +1776,15 @@ def _test_ap_wps_er_cache_ap_settings_oom2(dev, apdev): hapd.disable() for i in range(2): - ev = dev[0].wait_event([ "WPS-ER-AP-REMOVE", - "CTRL-EVENT-DISCONNECTED" ], + ev = dev[0].wait_event(["WPS-ER-AP-REMOVE", + "CTRL-EVENT-DISCONNECTED"], timeout=15) if ev is None: raise Exception("AP removal or disconnection timed out") hapd = hostapd.add_ap(apdev[0], params) for i in range(2): - ev = dev[0].wait_event([ "WPS-ER-AP-ADD", "CTRL-EVENT-CONNECTED" ], + ev = dev[0].wait_event(["WPS-ER-AP-ADD", "CTRL-EVENT-CONNECTED"], timeout=15) if ev is None: raise Exception("AP discovery or connection timed out") @@ -1770,15 +1802,15 @@ def _test_ap_wps_er_subscribe_oom(dev, apdev): ssid = "wps-er-add-enrollee" ap_pin = "12345670" ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e" - params = { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "12345678", "wpa": "2", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", - "device_name": "Wireless AP", "manufacturer": "Company", - "model_name": "WAP", "model_number": "123", - "serial_number": "12345", "device_type": "6-0050F204-1", - "os_version": "01020300", - "config_methods": "label push_button", - "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo" } + params = {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", + "device_name": "Wireless AP", "manufacturer": "Company", + "model_name": "WAP", "model_number": "123", + "serial_number": "12345", "device_type": "6-0050F204-1", + "os_version": "01020300", + "config_methods": "label push_button", + "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"} hapd = hostapd.add_ap(apdev[0], params) dev[0].scan_for_bss(apdev[0]['bssid'], freq=2412) dev[0].wps_reg(apdev[0]['bssid'], ap_pin) @@ -1809,15 +1841,15 @@ def _test_ap_wps_er_set_sel_reg_oom(dev, apdev): ssid = "wps-er-add-enrollee" ap_pin = "12345670" ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e" - params = { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "12345678", "wpa": "2", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", - "device_name": "Wireless AP", "manufacturer": "Company", - "model_name": "WAP", "model_number": "123", - "serial_number": "12345", "device_type": "6-0050F204-1", - "os_version": "01020300", - "config_methods": "label push_button", - "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo" } + params = {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", + "device_name": "Wireless AP", "manufacturer": "Company", + "model_name": "WAP", "model_number": "123", + "serial_number": "12345", "device_type": "6-0050F204-1", + "os_version": "01020300", + "config_methods": "label push_button", + "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"} hapd = hostapd.add_ap(apdev[0], params) dev[0].scan_for_bss(apdev[0]['bssid'], freq=2412) dev[0].wps_reg(apdev[0]['bssid'], ap_pin) @@ -1836,10 +1868,10 @@ def _test_ap_wps_er_set_sel_reg_oom(dev, apdev): raise Exception("WPS-FAIL timed out") time.sleep(0.1) - for func in [ "http_client_url_parse;wps_er_send_set_sel_reg", - "wps_er_soap_hdr;wps_er_send_set_sel_reg", - "http_client_addr;wps_er_send_set_sel_reg", - "wpabuf_alloc;wps_er_set_sel_reg" ]: + for func in ["http_client_url_parse;wps_er_send_set_sel_reg", + "wps_er_soap_hdr;wps_er_send_set_sel_reg", + "http_client_addr;wps_er_send_set_sel_reg", + "wpabuf_alloc;wps_er_set_sel_reg"]: with alloc_fail(dev[0], 1, func): if "OK" not in dev[0].request("WPS_ER_PBC " + ap_uuid): raise Exception("WPS_ER_PBC failed") @@ -1861,15 +1893,15 @@ def _test_ap_wps_er_learn_oom(dev, apdev): ssid = "wps-er-add-enrollee" ap_pin = "12345670" ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e" - params = { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "12345678", "wpa": "2", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", - "device_name": "Wireless AP", "manufacturer": "Company", - "model_name": "WAP", "model_number": "123", - "serial_number": "12345", "device_type": "6-0050F204-1", - "os_version": "01020300", - "config_methods": "label push_button", - "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo" } + params = {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", + "device_name": "Wireless AP", "manufacturer": "Company", + "model_name": "WAP", "model_number": "123", + "serial_number": "12345", "device_type": "6-0050F204-1", + "os_version": "01020300", + "config_methods": "label push_button", + "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"} hapd = hostapd.add_ap(apdev[0], params) dev[0].scan_for_bss(apdev[0]['bssid'], freq=2412) dev[0].wps_reg(apdev[0]['bssid'], ap_pin) @@ -1879,11 +1911,11 @@ def _test_ap_wps_er_learn_oom(dev, apdev): if ev is None: raise Exception("AP not discovered") - for func in [ "wps_er_http_put_message_cb", - "xml_get_base64_item;wps_er_http_put_message_cb", - "http_client_url_parse;wps_er_ap_put_message", - "wps_er_soap_hdr;wps_er_ap_put_message", - "http_client_addr;wps_er_ap_put_message" ]: + for func in ["wps_er_http_put_message_cb", + "xml_get_base64_item;wps_er_http_put_message_cb", + "http_client_url_parse;wps_er_ap_put_message", + "wps_er_soap_hdr;wps_er_ap_put_message", + "http_client_addr;wps_er_ap_put_message"]: with alloc_fail(dev[0], 1, func): dev[0].request("WPS_ER_LEARN " + ap_uuid + " " + ap_pin) ev = dev[0].wait_event(["WPS-ER-AP-SETTINGS"], timeout=1) @@ -1905,11 +1937,11 @@ def test_ap_wps_fragmentation(dev, apdev): ssid = "test-wps-fragmentation" appin = "12345670" hapd = hostapd.add_ap(apdev[0], - { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "12345678", "wpa": "3", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", - "wpa_pairwise": "TKIP", "ap_pin": appin, - "fragment_size": "50" }) + {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "3", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", + "wpa_pairwise": "TKIP", "ap_pin": appin, + "fragment_size": "50"}) logger.info("WPS provisioning step (PBC)") hapd.request("WPS_PBC") dev[0].scan_for_bss(apdev[0]['bssid'], freq=2412) @@ -1957,9 +1989,9 @@ def test_ap_wps_new_version_sta(dev, apdev): """WPS compatibility with new version number on the station""" ssid = "test-wps-ver" hapd = hostapd.add_ap(apdev[0], - { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "12345678", "wpa": "2", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP" }) + {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"}) logger.info("WPS provisioning step") hapd.request("WPS_PBC") dev[0].scan_for_bss(apdev[0]['bssid'], freq="2412") @@ -1974,9 +2006,9 @@ def test_ap_wps_new_version_ap(dev, apdev): """WPS compatibility with new version number on the AP""" ssid = "test-wps-ver" hapd = hostapd.add_ap(apdev[0], - { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "12345678", "wpa": "2", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP" }) + {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"}) logger.info("WPS provisioning step") if "FAIL" in hapd.request("SET wps_version_number 0x43"): raise Exception("Failed to enable test functionality") @@ -1991,16 +2023,16 @@ def test_ap_wps_new_version_ap(dev, apdev): def test_ap_wps_check_pin(dev, apdev): """Verify PIN checking through control interface""" hapd = hostapd.add_ap(apdev[0], - { "ssid": "wps", "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "12345678", "wpa": "2", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP" }) - for t in [ ("12345670", "12345670"), - ("12345678", "FAIL-CHECKSUM"), - ("12345", "FAIL"), - ("123456789", "FAIL"), - ("1234-5670", "12345670"), - ("1234 5670", "12345670"), - ("1-2.3:4 5670", "12345670") ]: + {"ssid": "wps", "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"}) + for t in [("12345670", "12345670"), + ("12345678", "FAIL-CHECKSUM"), + ("12345", "FAIL"), + ("123456789", "FAIL"), + ("1234-5670", "12345670"), + ("1234 5670", "12345670"), + ("1-2.3:4 5670", "12345670")]: res = hapd.request("WPS_CHECK_PIN " + t[0]).rstrip('\n') res2 = dev[0].request("WPS_CHECK_PIN " + t[0]).rstrip('\n') if res != res2: @@ -2019,13 +2051,20 @@ def test_ap_wps_check_pin(dev, apdev): if pin != rpin: raise Exception("Random PIN validation failed for " + pin) +def test_ap_wps_pin_get_failure(dev, apdev): + """PIN generation failure""" + with fail_test(dev[0], 1, + "os_get_random;wpa_supplicant_ctrl_iface_wps_pin"): + if "FAIL" not in dev[0].request("WPS_PIN get"): + raise Exception("WPS_PIN did not report failure") + def test_ap_wps_wep_config(dev, apdev): """WPS 2.0 AP rejecting WEP configuration""" ssid = "test-wps-config" appin = "12345670" hapd = hostapd.add_ap(apdev[0], - { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "ap_pin": appin}) + {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "ap_pin": appin}) dev[0].scan_for_bss(apdev[0]['bssid'], freq=2412) dev[0].wps_reg(apdev[0]['bssid'], appin, "wps-new-ssid-wep", "OPEN", "WEP", "hello", no_wait=True) @@ -2045,8 +2084,8 @@ def test_ap_wps_wep_config(dev, apdev): def test_ap_wps_wep_enroll(dev, apdev): """WPS 2.0 STA rejecting WEP configuration""" ssid = "test-wps-wep" - params = { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "skip_cred_build": "1", "extra_cred": "wps-wep-cred" } + params = {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "skip_cred_build": "1", "extra_cred": "wps-wep-cred"} hapd = hostapd.add_ap(apdev[0], params) hapd.request("WPS_PBC") dev[0].scan_for_bss(apdev[0]['bssid'], freq=2412) @@ -2061,14 +2100,14 @@ def test_ap_wps_wep_enroll(dev, apdev): def test_ap_wps_ie_fragmentation(dev, apdev): """WPS AP using fragmented WPS IE""" ssid = "test-wps-ie-fragmentation" - params = { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "12345678", "wpa": "2", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", - "device_name": "1234567890abcdef1234567890abcdef", - "manufacturer": "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef", - "model_name": "1234567890abcdef1234567890abcdef", - "model_number": "1234567890abcdef1234567890abcdef", - "serial_number": "1234567890abcdef1234567890abcdef" } + params = {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", + "device_name": "1234567890abcdef1234567890abcdef", + "manufacturer": "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef", + "model_name": "1234567890abcdef1234567890abcdef", + "model_number": "1234567890abcdef1234567890abcdef", + "serial_number": "1234567890abcdef1234567890abcdef"} hapd = hostapd.add_ap(apdev[0], params) hapd.request("WPS_PBC") dev[0].scan_for_bss(apdev[0]['bssid'], freq="2412") @@ -2097,7 +2136,7 @@ def get_psk(pskfile): for l in lines: if l == "# WPA PSKs": continue - (addr,psk) = l.split(' ') + (addr, psk) = l.split(' ') psks[addr] = psk return psks @@ -2119,10 +2158,10 @@ def test_ap_wps_per_station_psk(dev, apdev): with open(pskfile, "w") as f: f.write("# WPA PSKs\n") - params = { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "wpa": "2", "wpa_key_mgmt": "WPA-PSK", - "rsn_pairwise": "CCMP", "ap_pin": appin, - "wpa_psk_file": pskfile } + params = {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa": "2", "wpa_key_mgmt": "WPA-PSK", + "rsn_pairwise": "CCMP", "ap_pin": appin, + "wpa_psk_file": pskfile} hapd = hostapd.add_ap(apdev[0], params) logger.info("First enrollee") @@ -2189,14 +2228,15 @@ def test_ap_wps_per_station_psk_failure(dev, apdev): except: pass + hapd = None try: with open(pskfile, "w") as f: f.write("# WPA PSKs\n") - params = { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "wpa": "2", "wpa_key_mgmt": "WPA-PSK", - "rsn_pairwise": "CCMP", "ap_pin": appin, - "wpa_psk_file": pskfile } + params = {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa": "2", "wpa_key_mgmt": "WPA-PSK", + "rsn_pairwise": "CCMP", "ap_pin": appin, + "wpa_psk_file": pskfile} hapd = hostapd.add_ap(apdev[0], params) if "FAIL" in hapd.request("SET wpa_psk_file /tmp/does/not/exists/ap_wps_per_enrollee_psk_failure.psk_file"): raise Exception("Failed to set wpa_psk_file") @@ -2222,6 +2262,12 @@ def test_ap_wps_per_station_psk_failure(dev, apdev): if len(psks) > 0: raise Exception("PSK recorded unexpectedly") finally: + if hapd: + for i in range(3): + dev[i].request("DISCONNECT") + hapd.disable() + for i in range(3): + dev[i].flush_scan_cache() os.remove(pskfile) def test_ap_wps_pin_request_file(dev, apdev): @@ -2231,10 +2277,10 @@ def test_ap_wps_pin_request_file(dev, apdev): if os.path.exists(pinfile): os.remove(pinfile) hapd = hostapd.add_ap(apdev[0], - { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "wps_pin_requests": pinfile, - "wpa_passphrase": "12345678", "wpa": "2", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"}) + {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wps_pin_requests": pinfile, + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"}) uuid = dev[0].get_status_field("uuid") pin = dev[0].wps_read_pin() try: @@ -2286,9 +2332,9 @@ def test_ap_wps_auto_setup_with_config_file(dev, apdev): vals = dict() for l in lines: try: - [name,value] = l.split('=', 1) + [name, value] = l.split('=', 1) vals[name] = value - except ValueError, e: + except ValueError as e: if "# WPS configuration" in l: pass else: @@ -2310,13 +2356,13 @@ def test_ap_wps_pbc_timeout(dev, apdev, params): location = ssdp_get_location(ap_uuid) urls = upnp_get_urls(location) - eventurl = urlparse.urlparse(urls['event_sub_url']) - ctrlurl = urlparse.urlparse(urls['control_url']) + eventurl = urlparse(urls['event_sub_url']) + ctrlurl = urlparse(urls['control_url']) - url = urlparse.urlparse(location) - conn = httplib.HTTPConnection(url.netloc) + url = urlparse(location) + conn = HTTPConnection(url.netloc) - class WPSERHTTPServer(SocketServer.StreamRequestHandler): + class WPSERHTTPServer(StreamRequestHandler): def handle(self): data = self.rfile.readline().strip() logger.debug(data) @@ -2325,9 +2371,9 @@ def test_ap_wps_pbc_timeout(dev, apdev, params): server = MyTCPServer(("127.0.0.1", 12345), WPSERHTTPServer) server.timeout = 1 - headers = { "callback": '', - "NT": "upnp:event", - "timeout": "Second-1234" } + headers = {"callback": '', + "NT": "upnp:event", + "timeout": "Second-1234"} conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers) resp = conn.getresponse() if resp.status != 200: @@ -2345,7 +2391,7 @@ VFi5hrLk ''' - headers = { "Content-type": 'text/xml; charset="utf-8"' } + headers = {"Content-type": 'text/xml; charset="utf-8"'} headers["SOAPAction"] = '"urn:schemas-wifialliance-org:service:WFAWLANConfig:1#%s"' % "SetSelectedRegistrar" conn.request("POST", ctrlurl.path, msg, headers) resp = conn.getresponse() @@ -2413,20 +2459,20 @@ VFi5hrLk def add_ssdp_ap(ap, ap_uuid): ssid = "wps-ssdp" ap_pin = "12345670" - params = { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "12345678", "wpa": "2", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", - "device_name": "Wireless AP", "manufacturer": "Company", - "model_name": "WAP", "model_number": "123", - "serial_number": "12345", "device_type": "6-0050F204-1", - "os_version": "01020300", - "config_methods": "label push_button", - "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo", - "friendly_name": "WPS Access Point", - "manufacturer_url": "http://www.example.com/", - "model_description": "Wireless Access Point", - "model_url": "http://www.example.com/model/", - "upc": "123456789012" } + params = {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", + "device_name": "Wireless AP", "manufacturer": "Company", + "model_name": "WAP", "model_number": "123", + "serial_number": "12345", "device_type": "6-0050F204-1", + "os_version": "01020300", + "config_methods": "label push_button", + "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo", + "friendly_name": "WPS Access Point", + "manufacturer_url": "http://www.example.com/", + "model_description": "Wireless Access Point", + "model_url": "http://www.example.com/model/", + "upc": "123456789012"} return hostapd.add_ap(ap, params) def ssdp_send(msg, no_recv=False): @@ -2435,10 +2481,10 @@ def ssdp_send(msg, no_recv=False): sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2) sock.bind(("127.0.0.1", 0)) - sock.sendto(msg, ("239.255.255.250", 1900)) + sock.sendto(msg.encode(), ("239.255.255.250", 1900)) if no_recv: return None - return sock.recv(1000) + return sock.recv(1000).decode() def ssdp_send_msearch(st, no_recv=False): msg = '\r\n'.join([ @@ -2506,7 +2552,7 @@ def test_ap_wps_ssdp_invalid_msearch(dev, apdev): 'MAN: "ssdp:discover"', 'ST: urn:schemas-wifialliance-org:device:WFADevice:1', '', '']) - sock.sendto(msg, ("239.255.255.250", 1900)) + sock.sendto(msg.encode(), ("239.255.255.250", 1900)) logger.debug("Negative MX") msg = '\r\n'.join([ @@ -2516,7 +2562,7 @@ def test_ap_wps_ssdp_invalid_msearch(dev, apdev): 'MAN: "ssdp:discover"', 'ST: urn:schemas-wifialliance-org:device:WFADevice:1', '', '']) - sock.sendto(msg, ("239.255.255.250", 1900)) + sock.sendto(msg.encode(), ("239.255.255.250", 1900)) logger.debug("Invalid MX") msg = '\r\n'.join([ @@ -2526,7 +2572,7 @@ def test_ap_wps_ssdp_invalid_msearch(dev, apdev): 'MAN: "ssdp:discover"', 'ST: urn:schemas-wifialliance-org:device:WFADevice:1', '', '']) - sock.sendto(msg, ("239.255.255.250", 1900)) + sock.sendto(msg.encode(), ("239.255.255.250", 1900)) logger.debug("Missing MAN") msg = '\r\n'.join([ @@ -2535,7 +2581,7 @@ def test_ap_wps_ssdp_invalid_msearch(dev, apdev): 'MX: 1', 'ST: urn:schemas-wifialliance-org:device:WFADevice:1', '', '']) - sock.sendto(msg, ("239.255.255.250", 1900)) + sock.sendto(msg.encode(), ("239.255.255.250", 1900)) logger.debug("Invalid MAN") msg = '\r\n'.join([ @@ -2545,7 +2591,7 @@ def test_ap_wps_ssdp_invalid_msearch(dev, apdev): 'MAN: foo', 'ST: urn:schemas-wifialliance-org:device:WFADevice:1', '', '']) - sock.sendto(msg, ("239.255.255.250", 1900)) + sock.sendto(msg.encode(), ("239.255.255.250", 1900)) msg = '\r\n'.join([ 'M-SEARCH * HTTP/1.1', 'HOST: 239.255.255.250:1900', @@ -2553,7 +2599,7 @@ def test_ap_wps_ssdp_invalid_msearch(dev, apdev): 'MAN; "ssdp:discover"', 'ST: urn:schemas-wifialliance-org:device:WFADevice:1', '', '']) - sock.sendto(msg, ("239.255.255.250", 1900)) + sock.sendto(msg.encode(), ("239.255.255.250", 1900)) logger.debug("Missing HOST") msg = '\r\n'.join([ @@ -2562,7 +2608,7 @@ def test_ap_wps_ssdp_invalid_msearch(dev, apdev): 'MX: 1', 'ST: urn:schemas-wifialliance-org:device:WFADevice:1', '', '']) - sock.sendto(msg, ("239.255.255.250", 1900)) + sock.sendto(msg.encode(), ("239.255.255.250", 1900)) logger.debug("Missing ST") msg = '\r\n'.join([ @@ -2571,7 +2617,7 @@ def test_ap_wps_ssdp_invalid_msearch(dev, apdev): 'MAN: "ssdp:discover"', 'MX: 1', '', '']) - sock.sendto(msg, ("239.255.255.250", 1900)) + sock.sendto(msg.encode(), ("239.255.255.250", 1900)) logger.debug("Mismatching ST") msg = '\r\n'.join([ @@ -2581,7 +2627,7 @@ def test_ap_wps_ssdp_invalid_msearch(dev, apdev): 'MX: 1', 'ST: uuid:16d5f8a9-4ee4-4f5e-81f9-cc6e2f47f42d', '', '']) - sock.sendto(msg, ("239.255.255.250", 1900)) + sock.sendto(msg.encode(), ("239.255.255.250", 1900)) msg = '\r\n'.join([ 'M-SEARCH * HTTP/1.1', 'HOST: 239.255.255.250:1900', @@ -2589,7 +2635,7 @@ def test_ap_wps_ssdp_invalid_msearch(dev, apdev): 'MX: 1', 'ST: foo:bar', '', '']) - sock.sendto(msg, ("239.255.255.250", 1900)) + sock.sendto(msg.encode(), ("239.255.255.250", 1900)) msg = '\r\n'.join([ 'M-SEARCH * HTTP/1.1', 'HOST: 239.255.255.250:1900', @@ -2597,7 +2643,7 @@ def test_ap_wps_ssdp_invalid_msearch(dev, apdev): 'MX: 1', 'ST: foobar', '', '']) - sock.sendto(msg, ("239.255.255.250", 1900)) + sock.sendto(msg.encode(), ("239.255.255.250", 1900)) logger.debug("Invalid ST") msg = '\r\n'.join([ @@ -2607,7 +2653,7 @@ def test_ap_wps_ssdp_invalid_msearch(dev, apdev): 'MX: 1', 'ST; urn:schemas-wifialliance-org:device:WFADevice:1', '', '']) - sock.sendto(msg, ("239.255.255.250", 1900)) + sock.sendto(msg.encode(), ("239.255.255.250", 1900)) logger.debug("Invalid M-SEARCH") msg = '\r\n'.join([ @@ -2617,7 +2663,7 @@ def test_ap_wps_ssdp_invalid_msearch(dev, apdev): 'MX: 1', 'ST: urn:schemas-wifialliance-org:device:WFADevice:1', '', '']) - sock.sendto(msg, ("239.255.255.250", 1900)) + sock.sendto(msg.encode(), ("239.255.255.250", 1900)) msg = '\r\n'.join([ 'M-SEARCH-* HTTP/1.1', 'HOST: 239.255.255.250:1900', @@ -2625,10 +2671,10 @@ def test_ap_wps_ssdp_invalid_msearch(dev, apdev): 'MX: 1', 'ST: urn:schemas-wifialliance-org:device:WFADevice:1', '', '']) - sock.sendto(msg, ("239.255.255.250", 1900)) + sock.sendto(msg.encode(), ("239.255.255.250", 1900)) logger.debug("Invalid message format") - sock.sendto("NOTIFY * HTTP/1.1", ("239.255.255.250", 1900)) + sock.sendto(b"NOTIFY * HTTP/1.1", ("239.255.255.250", 1900)) msg = '\r'.join([ 'M-SEARCH * HTTP/1.1', 'HOST: 239.255.255.250:1900', @@ -2636,7 +2682,7 @@ def test_ap_wps_ssdp_invalid_msearch(dev, apdev): 'MX: 1', 'ST: urn:schemas-wifialliance-org:device:WFADevice:1', '', '']) - sock.sendto(msg, ("239.255.255.250", 1900)) + sock.sendto(msg.encode(), ("239.255.255.250", 1900)) try: r = sock.recv(1000) @@ -2652,7 +2698,7 @@ def test_ap_wps_ssdp_invalid_msearch(dev, apdev): 'MX: 1', 'ST: urn:schemas-wifialliance-org:device:WFADevice:1', '', '']) - sock.sendto(msg, ("239.255.255.250", 1900)) + sock.sendto(msg.encode(), ("239.255.255.250", 1900)) try: r = sock.recv(1000) @@ -2678,11 +2724,11 @@ def test_ap_wps_ssdp_burst(dev, apdev): sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2) sock.bind(("127.0.0.1", 0)) for i in range(0, 25): - sock.sendto(msg, ("239.255.255.250", 1900)) + sock.sendto(msg.encode(), ("239.255.255.250", 1900)) resp = 0 while True: try: - r = sock.recv(1000) + r = sock.recv(1000).decode() if not r.startswith("HTTP/1.1 200 OK\r\n"): raise Exception("Unexpected message: " + r) resp += 1 @@ -2696,10 +2742,10 @@ def test_ap_wps_ssdp_burst(dev, apdev): sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2) sock.bind(("127.0.0.1", 0)) for i in range(0, 25): - sock.sendto(msg, ("239.255.255.250", 1900)) + sock.sendto(msg.encode(), ("239.255.255.250", 1900)) while True: try: - r = sock.recv(1000) + r = sock.recv(1000).decode() if ap_uuid in r: break except socket.timeout: @@ -2717,15 +2763,20 @@ def ssdp_get_location(uuid): return location def upnp_get_urls(location): - conn = urllib.urlopen(location, proxies={}) + if sys.version_info[0] > 2: + conn = urlopen(location) + else: + conn = urlopen(location, proxies={}) tree = ET.parse(conn) root = tree.getroot() urn = '{urn:schemas-upnp-org:device-1-0}' service = root.find("./" + urn + "device/" + urn + "serviceList/" + urn + "service") res = {} - res['scpd_url'] = urlparse.urljoin(location, service.find(urn + 'SCPDURL').text) - res['control_url'] = urlparse.urljoin(location, service.find(urn + 'controlURL').text) - res['event_sub_url'] = urlparse.urljoin(location, service.find(urn + 'eventSubURL').text) + res['scpd_url'] = urljoin(location, service.find(urn + 'SCPDURL').text) + res['control_url'] = urljoin(location, + service.find(urn + 'controlURL').text) + res['event_sub_url'] = urljoin(location, + service.find(urn + 'eventSubURL').text) return res def upnp_soap_action(conn, path, action, include_soap_action=True, @@ -2742,23 +2793,21 @@ def upnp_soap_action(conn, path, action, include_soap_action=True, act = ET.SubElement(body, "{%s}%s" % (wpsns, action)) if newmsg: msg = ET.SubElement(act, "NewMessage") - msg.text = base64.b64encode(newmsg) + msg.text = base64.b64encode(newmsg.encode()).decode() if neweventtype: msg = ET.SubElement(act, "NewWLANEventType") msg.text = neweventtype if neweventmac: msg = ET.SubElement(act, "NewWLANEventMAC") msg.text = neweventmac - tree = ET.ElementTree(root) - soap = StringIO.StringIO() - tree.write(soap, xml_declaration=True, encoding='utf-8') - headers = { "Content-type": 'text/xml; charset="utf-8"' } + headers = {"Content-type": 'text/xml; charset="utf-8"'} if include_soap_action: headers["SOAPAction"] = '"urn:schemas-wifialliance-org:service:WFAWLANConfig:1#%s"' % action elif soap_action_override: headers["SOAPAction"] = soap_action_override - conn.request("POST", path, soap.getvalue(), headers) + decl = b'\n' + conn.request("POST", path, decl + ET.tostring(root), headers) return conn.getresponse() def test_ap_wps_upnp(dev, apdev): @@ -2769,19 +2818,29 @@ def test_ap_wps_upnp(dev, apdev): location = ssdp_get_location(ap_uuid) urls = upnp_get_urls(location) - conn = urllib.urlopen(urls['scpd_url'], proxies={}) + if sys.version_info[0] > 2: + conn = urlopen(urls['scpd_url']) + else: + conn = urlopen(urls['scpd_url'], proxies={}) scpd = conn.read() - conn = urllib.urlopen(urlparse.urljoin(location, "unknown.html"), - proxies={}) - if conn.getcode() != 404: - raise Exception("Unexpected HTTP response to GET unknown URL") + if sys.version_info[0] > 2: + try: + conn = urlopen(urljoin(location, "unknown.html")) + raise Exception("Unexpected HTTP response to GET unknown URL") + except HTTPError as e: + if e.code != 404: + raise Exception("Unexpected HTTP response to GET unknown URL") + else: + conn = urlopen(urljoin(location, "unknown.html"), proxies={}) + if conn.getcode() != 404: + raise Exception("Unexpected HTTP response to GET unknown URL") - url = urlparse.urlparse(location) - conn = httplib.HTTPConnection(url.netloc) + url = urlparse(location) + conn = HTTPConnection(url.netloc) #conn.set_debuglevel(1) - headers = { "Content-type": 'text/xml; charset="utf-8"', - "SOAPAction": '"urn:schemas-wifialliance-org:service:WFAWLANConfig:1#GetDeviceInfo"' } + headers = {"Content-type": 'text/xml; charset="utf-8"', + "SOAPAction": '"urn:schemas-wifialliance-org:service:WFAWLANConfig:1#GetDeviceInfo"'} conn.request("POST", "hello", "\r\n\r\n", headers) resp = conn.getresponse() if resp.status != 404: @@ -2792,9 +2851,9 @@ def test_ap_wps_upnp(dev, apdev): if resp.status != 501: raise Exception("Unexpected HTTP response: %d" % resp.status) - headers = { "Content-type": 'text/xml; charset="utf-8"', - "SOAPAction": '"urn:some-unknown-action#GetDeviceInfo"' } - ctrlurl = urlparse.urlparse(urls['control_url']) + headers = {"Content-type": 'text/xml; charset="utf-8"', + "SOAPAction": '"urn:some-unknown-action#GetDeviceInfo"'} + ctrlurl = urlparse(urls['control_url']) conn.request("POST", ctrlurl.path, "\r\n\r\n", headers) resp = conn.getresponse() if resp.status != 401: @@ -2807,10 +2866,10 @@ def test_ap_wps_upnp(dev, apdev): raise Exception("Unexpected HTTP response: %d" % resp.status) logger.debug("GetDeviceInfo with invalid SOAPAction header") - for act in [ "foo", - "urn:schemas-wifialliance-org:service:WFAWLANConfig:1#GetDeviceInfo", - '"urn:schemas-wifialliance-org:service:WFAWLANConfig:1"', - '"urn:schemas-wifialliance-org:service:WFAWLANConfig:123#GetDevice']: + for act in ["foo", + "urn:schemas-wifialliance-org:service:WFAWLANConfig:1#GetDeviceInfo", + '"urn:schemas-wifialliance-org:service:WFAWLANConfig:1"', + '"urn:schemas-wifialliance-org:service:WFAWLANConfig:123#GetDevice']: resp = upnp_soap_action(conn, ctrlurl.path, "GetDeviceInfo", include_soap_action=False, soap_action_override=act) @@ -2820,7 +2879,7 @@ def test_ap_wps_upnp(dev, apdev): resp = upnp_soap_action(conn, ctrlurl.path, "GetDeviceInfo") if resp.status != 200: raise Exception("Unexpected HTTP response: %d" % resp.status) - dev = resp.read() + dev = resp.read().decode() if "NewDeviceInfo" not in dev: raise Exception("Unexpected GetDeviceInfo response") @@ -2851,13 +2910,13 @@ def test_ap_wps_upnp_subscribe(dev, apdev): location = ssdp_get_location(ap_uuid) urls = upnp_get_urls(location) - eventurl = urlparse.urlparse(urls['event_sub_url']) + eventurl = urlparse(urls['event_sub_url']) - url = urlparse.urlparse(location) - conn = httplib.HTTPConnection(url.netloc) + url = urlparse(location) + conn = HTTPConnection(url.netloc) #conn.set_debuglevel(1) - headers = { "callback": '', - "timeout": "Second-1234" } + headers = {"callback": '', + "timeout": "Second-1234"} conn.request("SUBSCRIBE", "hello", "\r\n\r\n", headers) resp = conn.getresponse() if resp.status != 412: @@ -2868,25 +2927,25 @@ def test_ap_wps_upnp_subscribe(dev, apdev): if resp.status != 412: raise Exception("Unexpected HTTP response: %d" % resp.status) - headers = { "NT": "upnp:event", - "timeout": "Second-1234" } + headers = {"NT": "upnp:event", + "timeout": "Second-1234"} conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers) resp = conn.getresponse() if resp.status != 412: raise Exception("Unexpected HTTP response: %d" % resp.status) - headers = { "callback": '', - "NT": "upnp:foobar", - "timeout": "Second-1234" } + headers = {"callback": '', + "NT": "upnp:foobar", + "timeout": "Second-1234"} conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers) resp = conn.getresponse() if resp.status != 400: raise Exception("Unexpected HTTP response: %d" % resp.status) logger.debug("Valid subscription") - headers = { "callback": '', - "NT": "upnp:event", - "timeout": "Second-1234" } + headers = {"callback": '', + "NT": "upnp:event", + "timeout": "Second-1234"} conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers) resp = conn.getresponse() if resp.status != 200: @@ -2895,46 +2954,46 @@ def test_ap_wps_upnp_subscribe(dev, apdev): logger.debug("Subscription SID " + sid) logger.debug("Invalid re-subscription") - headers = { "NT": "upnp:event", - "sid": "123456734567854", - "timeout": "Second-1234" } + headers = {"NT": "upnp:event", + "sid": "123456734567854", + "timeout": "Second-1234"} conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers) resp = conn.getresponse() if resp.status != 400: raise Exception("Unexpected HTTP response: %d" % resp.status) logger.debug("Invalid re-subscription") - headers = { "NT": "upnp:event", - "sid": "uuid:123456734567854", - "timeout": "Second-1234" } + headers = {"NT": "upnp:event", + "sid": "uuid:123456734567854", + "timeout": "Second-1234"} conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers) resp = conn.getresponse() if resp.status != 400: raise Exception("Unexpected HTTP response: %d" % resp.status) logger.debug("Invalid re-subscription") - headers = { "callback": '', - "NT": "upnp:event", - "sid": sid, - "timeout": "Second-1234" } + headers = {"callback": '', + "NT": "upnp:event", + "sid": sid, + "timeout": "Second-1234"} conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers) resp = conn.getresponse() if resp.status != 400: raise Exception("Unexpected HTTP response: %d" % resp.status) logger.debug("SID mismatch in re-subscription") - headers = { "NT": "upnp:event", - "sid": "uuid:4c2bca79-1ff4-4e43-85d4-952a2b8a51fb", - "timeout": "Second-1234" } + headers = {"NT": "upnp:event", + "sid": "uuid:4c2bca79-1ff4-4e43-85d4-952a2b8a51fb", + "timeout": "Second-1234"} conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers) resp = conn.getresponse() if resp.status != 412: raise Exception("Unexpected HTTP response: %d" % resp.status) logger.debug("Valid re-subscription") - headers = { "NT": "upnp:event", - "sid": sid, - "timeout": "Second-1234" } + headers = {"NT": "upnp:event", + "sid": sid, + "timeout": "Second-1234"} conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers) resp = conn.getresponse() if resp.status != 200: @@ -2946,72 +3005,72 @@ def test_ap_wps_upnp_subscribe(dev, apdev): raise Exception("Unexpected SID change") logger.debug("Valid re-subscription") - headers = { "NT": "upnp:event", - "sid": "uuid: \t \t" + sid.split(':')[1], - "timeout": "Second-1234" } + headers = {"NT": "upnp:event", + "sid": "uuid: \t \t" + sid.split(':')[1], + "timeout": "Second-1234"} conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers) resp = conn.getresponse() if resp.status != 200: raise Exception("Unexpected HTTP response: %d" % resp.status) logger.debug("Invalid unsubscription") - headers = { "sid": sid } + headers = {"sid": sid} conn.request("UNSUBSCRIBE", "/hello", "\r\n\r\n", headers) resp = conn.getresponse() if resp.status != 412: raise Exception("Unexpected HTTP response: %d" % resp.status) - headers = { "foo": "bar" } + headers = {"foo": "bar"} conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers) resp = conn.getresponse() if resp.status != 412: raise Exception("Unexpected HTTP response: %d" % resp.status) logger.debug("Valid unsubscription") - headers = { "sid": sid } + headers = {"sid": sid} conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers) resp = conn.getresponse() if resp.status != 200: raise Exception("Unexpected HTTP response: %d" % resp.status) logger.debug("Unsubscription for not existing SID") - headers = { "sid": sid } + headers = {"sid": sid} conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers) resp = conn.getresponse() if resp.status != 412: raise Exception("Unexpected HTTP response: %d" % resp.status) logger.debug("Invalid unsubscription") - headers = { "sid": " \t \tfoo" } + headers = {"sid": " \t \tfoo"} conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers) resp = conn.getresponse() if resp.status != 400: raise Exception("Unexpected HTTP response: %d" % resp.status) logger.debug("Invalid unsubscription") - headers = { "sid": "uuid:\t \tfoo" } + headers = {"sid": "uuid:\t \tfoo"} conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers) resp = conn.getresponse() if resp.status != 400: raise Exception("Unexpected HTTP response: %d" % resp.status) logger.debug("Invalid unsubscription") - headers = { "NT": "upnp:event", - "sid": sid } + headers = {"NT": "upnp:event", + "sid": sid} conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers) resp = conn.getresponse() if resp.status != 400: raise Exception("Unexpected HTTP response: %d" % resp.status) - headers = { "callback": '', - "sid": sid } + headers = {"callback": '', + "sid": sid} conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers) resp = conn.getresponse() if resp.status != 400: raise Exception("Unexpected HTTP response: %d" % resp.status) logger.debug("Valid subscription with multiple callbacks") - headers = { "callback": ' \t', - "NT": "upnp:event", - "timeout": "Second-1234" } + headers = {"callback": ' \t', + "NT": "upnp:event", + "timeout": "Second-1234"} conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers) resp = conn.getresponse() if resp.status != 200: @@ -3038,15 +3097,15 @@ def test_ap_wps_upnp_subscribe(dev, apdev): time.sleep(0.1) time.sleep(0.2) - headers = { "sid": sid } + headers = {"sid": sid} conn.request("UNSUBSCRIBE", eventurl.path, "", headers) resp = conn.getresponse() if resp.status != 200 and resp.status != 412: raise Exception("Unexpected HTTP response for UNSUBSCRIBE: %d" % resp.status) - headers = { "callback": '', - "NT": "upnp:event", - "timeout": "Second-1234" } + headers = {"callback": '', + "NT": "upnp:event", + "timeout": "Second-1234"} with alloc_fail(hapd, 1, "http_client_addr;event_send_start"): conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers) resp = conn.getresponse() @@ -3055,15 +3114,15 @@ def test_ap_wps_upnp_subscribe(dev, apdev): sid = resp.getheader("sid") logger.debug("Subscription SID " + sid) - headers = { "sid": sid } + headers = {"sid": sid} conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers) resp = conn.getresponse() if resp.status != 200: raise Exception("Unexpected HTTP response for UNSUBSCRIBE: %d" % resp.status) - headers = { "callback": '', - "NT": "upnp:event", - "timeout": "Second-1234" } + headers = {"callback": '', + "NT": "upnp:event", + "timeout": "Second-1234"} conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers) resp = conn.getresponse() if resp.status != 200: @@ -3114,25 +3173,25 @@ def test_ap_wps_upnp_subscribe(dev, apdev): if resp.status != 500: raise Exception("Unexpected HTTP response: %d" % resp.status) - headers = { "callback": '', - "NT": "upnp:event", - "timeout": "Second-1234" } + headers = {"callback": '', + "NT": "upnp:event", + "timeout": "Second-1234"} conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers) resp = conn.getresponse() if resp.status != 500: raise Exception("Unexpected HTTP response: %d" % resp.status) - headers = { "callback": ' <', - "NT": "upnp:event", - "timeout": "Second-1234" } + headers = {"callback": ' <', + "NT": "upnp:event", + "timeout": "Second-1234"} conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers) resp = conn.getresponse() if resp.status != 500: raise Exception("Unexpected HTTP response: %d" % resp.status) - headers = { "callback": '', - "NT": "upnp:event", - "timeout": "Second-1234" } + headers = {"callback": '', + "NT": "upnp:event", + "timeout": "Second-1234"} with alloc_fail(hapd, 1, "wpabuf_alloc;subscription_first_event"): conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers) resp = conn.getresponse() @@ -3158,9 +3217,9 @@ def test_ap_wps_upnp_subscribe(dev, apdev): raise Exception("Unexpected HTTP response: %d" % resp.status) for i in range(6): - headers = { "callback": '' % (12345 + i), - "NT": "upnp:event", - "timeout": "Second-1234" } + headers = {"callback": '' % (12345 + i), + "NT": "upnp:event", + "timeout": "Second-1234"} conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers) resp = conn.getresponse() if resp.status != 200: @@ -3180,7 +3239,8 @@ def test_ap_wps_upnp_subscribe(dev, apdev): dev[1].request("WPS_CANCEL") time.sleep(0.1) - with alloc_fail(hapd, 1, "base64_encode;upnp_wps_device_send_wlan_event"): + with alloc_fail(hapd, 1, + "base64_gen_encode;?base64_encode;upnp_wps_device_send_wlan_event"): dev[1].dump_monitor() dev[1].request("WPS_PIN " + apdev[0]['bssid'] + " 12345670") dev[1].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 5) @@ -3199,9 +3259,9 @@ def test_ap_wps_upnp_subscribe_events(dev, apdev): location = ssdp_get_location(ap_uuid) urls = upnp_get_urls(location) - eventurl = urlparse.urlparse(urls['event_sub_url']) + eventurl = urlparse(urls['event_sub_url']) - class WPSERHTTPServer(SocketServer.StreamRequestHandler): + class WPSERHTTPServer(StreamRequestHandler): def handle(self): data = self.rfile.readline().strip() logger.debug(data) @@ -3210,12 +3270,12 @@ def test_ap_wps_upnp_subscribe_events(dev, apdev): server = MyTCPServer(("127.0.0.1", 12345), WPSERHTTPServer) server.timeout = 1 - url = urlparse.urlparse(location) - conn = httplib.HTTPConnection(url.netloc) + url = urlparse(location) + conn = HTTPConnection(url.netloc) - headers = { "callback": '', - "NT": "upnp:event", - "timeout": "Second-1234" } + headers = {"callback": '', + "NT": "upnp:event", + "timeout": "Second-1234"} conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers) resp = conn.getresponse() if resp.status != 200: @@ -3266,8 +3326,8 @@ def test_ap_wps_upnp_http_proto(dev, apdev): location = ssdp_get_location(ap_uuid) - url = urlparse.urlparse(location) - conn = httplib.HTTPConnection(url.netloc, timeout=0.2) + url = urlparse(location) + conn = HTTPConnection(url.netloc, timeout=0.2) #conn.set_debuglevel(1) conn.request("HEAD", "hello") @@ -3276,46 +3336,46 @@ def test_ap_wps_upnp_http_proto(dev, apdev): raise Exception("Unexpected response to HEAD: " + str(resp.status)) conn.close() - for cmd in [ "PUT", "DELETE", "TRACE", "CONNECT", "M-SEARCH", "M-POST" ]: + for cmd in ["PUT", "DELETE", "TRACE", "CONNECT", "M-SEARCH", "M-POST"]: try: conn.request(cmd, "hello") resp = conn.getresponse() - except Exception, e: + except Exception as e: pass conn.close() - headers = { "Content-Length": 'abc' } + headers = {"Content-Length": 'abc'} conn.request("HEAD", "hello", "\r\n\r\n", headers) try: resp = conn.getresponse() - except Exception, e: + except Exception as e: pass conn.close() - headers = { "Content-Length": '-10' } + headers = {"Content-Length": '-10'} conn.request("HEAD", "hello", "\r\n\r\n", headers) try: resp = conn.getresponse() - except Exception, e: + except Exception as e: pass conn.close() - headers = { "Content-Length": '10000000000000' } + headers = {"Content-Length": '10000000000000'} conn.request("HEAD", "hello", "\r\n\r\nhello", headers) try: resp = conn.getresponse() - except Exception, e: + except Exception as e: pass conn.close() - headers = { "Transfer-Encoding": 'abc' } + headers = {"Transfer-Encoding": 'abc'} conn.request("HEAD", "hello", "\r\n\r\n", headers) resp = conn.getresponse() if resp.status != 501: raise Exception("Unexpected response to HEAD: " + str(resp.status)) conn.close() - headers = { "Transfer-Encoding": 'chunked' } + headers = {"Transfer-Encoding": 'chunked'} conn.request("HEAD", "hello", "\r\n\r\n", headers) resp = conn.getresponse() if resp.status != 501: @@ -3326,7 +3386,7 @@ def test_ap_wps_upnp_http_proto(dev, apdev): conn.request("HEAD", 5000 * 'A') try: resp = conn.getresponse() - except Exception, e: + except Exception as e: pass conn.close() @@ -3337,11 +3397,11 @@ def test_ap_wps_upnp_http_proto(dev, apdev): raise Exception("Unexpected response to HEAD: " + str(resp.status)) conn.close() - headers = { "Content-Length": '20' } + headers = {"Content-Length": '20'} conn.request("POST", "hello", 10 * 'A' + "\r\n\r\n", headers) try: resp = conn.getresponse() - except Exception, e: + except Exception as e: pass conn.close() @@ -3354,7 +3414,7 @@ def test_ap_wps_upnp_http_proto(dev, apdev): conn.request("POST", "hello", 60000 * 'A' + "\r\n\r\n") try: resp = conn.getresponse() - except Exception, e: + except Exception as e: pass conn.close() @@ -3365,11 +3425,11 @@ def test_ap_wps_upnp_http_proto_chunked(dev, apdev): location = ssdp_get_location(ap_uuid) - url = urlparse.urlparse(location) - conn = httplib.HTTPConnection(url.netloc) + url = urlparse(location) + conn = HTTPConnection(url.netloc) #conn.set_debuglevel(1) - headers = { "Transfer-Encoding": 'chunked' } + headers = {"Transfer-Encoding": 'chunked'} conn.request("POST", "hello", "a\r\nabcdefghij\r\n" + "2\r\nkl\r\n" + "0\r\n\r\n", headers) @@ -3381,10 +3441,10 @@ def test_ap_wps_upnp_http_proto_chunked(dev, apdev): conn.putrequest("POST", "hello") conn.putheader('Transfer-Encoding', 'chunked') conn.endheaders() - conn.send("a\r\nabcdefghij\r\n") + conn.send(b"a\r\nabcdefghij\r\n") time.sleep(0.1) - conn.send("2\r\nkl\r\n") - conn.send("0\r\n\r\n") + conn.send(b"2\r\nkl\r\n") + conn.send(b"0\r\n\r\n") resp = conn.getresponse() if resp.status != 404: raise Exception("Unexpected HTTP response: %d" % resp.status) @@ -3396,28 +3456,28 @@ def test_ap_wps_upnp_http_proto_chunked(dev, apdev): completed = False try: for i in range(20000): - conn.send("1\r\nZ\r\n") - conn.send("0\r\n\r\n") + conn.send(b"1\r\nZ\r\n") + conn.send(b"0\r\n\r\n") resp = conn.getresponse() completed = True - except Exception, e: + except Exception as e: pass conn.close() if completed: raise Exception("Too long chunked request did not result in connection reset") - headers = { "Transfer-Encoding": 'chunked' } + headers = {"Transfer-Encoding": 'chunked'} conn.request("POST", "hello", "80000000\r\na", headers) try: resp = conn.getresponse() - except Exception, e: + except Exception as e: pass conn.close() conn.request("POST", "hello", "10000000\r\na", headers) try: resp = conn.getresponse() - except Exception, e: + except Exception as e: pass conn.close() @@ -3425,7 +3485,7 @@ def test_ap_wps_upnp_http_proto_chunked(dev, apdev): def test_ap_wps_disabled(dev, apdev): """WPS operations while WPS is disabled""" ssid = "test-wps-disabled" - hapd = hostapd.add_ap(apdev[0], { "ssid": ssid }) + hapd = hostapd.add_ap(apdev[0], {"ssid": ssid}) if "FAIL" not in hapd.request("WPS_PBC"): raise Exception("WPS_PBC succeeded unexpectedly") if "FAIL" not in hapd.request("WPS_CANCEL"): @@ -3434,8 +3494,8 @@ def test_ap_wps_disabled(dev, apdev): def test_ap_wps_mixed_cred(dev, apdev): """WPS 2.0 STA merging mixed mode WPA/WPA2 credentials""" ssid = "test-wps-wep" - params = { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "skip_cred_build": "1", "extra_cred": "wps-mixed-cred" } + params = {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "skip_cred_build": "1", "extra_cred": "wps-mixed-cred"} hapd = hostapd.add_ap(apdev[0], params) hapd.request("WPS_PBC") dev[0].scan_for_bss(apdev[0]['bssid'], freq="2412") @@ -3451,7 +3511,8 @@ def test_ap_wps_mixed_cred(dev, apdev): if proto != "WPA RSN": raise Exception("Unexpected merged proto field value: " + proto) pairwise = dev[0].get_network(id, "pairwise") - if pairwise != "CCMP TKIP" and pairwise != "CCMP GCMP TKIP": + p = pairwise.split() + if "CCMP" not in p or "TKIP" not in p: raise Exception("Unexpected merged pairwise field value: " + pairwise) @remote_compatible @@ -3459,11 +3520,11 @@ def test_ap_wps_while_connected(dev, apdev): """WPS PBC provisioning while connected to another AP""" ssid = "test-wps-conf" hapd = hostapd.add_ap(apdev[0], - { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "12345678", "wpa": "2", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"}) + {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"}) - hostapd.add_ap(apdev[1], { "ssid": "open" }) + hostapd.add_ap(apdev[1], {"ssid": "open"}) dev[0].connect("open", key_mgmt="NONE", scan_freq="2412") logger.info("WPS provisioning step") @@ -3480,11 +3541,11 @@ def test_ap_wps_while_connected_no_autoconnect(dev, apdev): """WPS PBC provisioning while connected to another AP and STA_AUTOCONNECT disabled""" ssid = "test-wps-conf" hapd = hostapd.add_ap(apdev[0], - { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "12345678", "wpa": "2", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"}) + {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"}) - hostapd.add_ap(apdev[1], { "ssid": "open" }) + hostapd.add_ap(apdev[1], {"ssid": "open"}) try: dev[0].request("STA_AUTOCONNECT 0") @@ -3506,9 +3567,9 @@ def test_ap_wps_from_event(dev, apdev): """WPS PBC event on AP to enable PBC""" ssid = "test-wps-conf" hapd = hostapd.add_ap(apdev[0], - { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "12345678", "wpa": "2", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"}) + {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"}) dev[0].scan_for_bss(apdev[0]['bssid'], freq="2412") dev[0].dump_monitor() hapd.dump_monitor() @@ -3529,9 +3590,9 @@ def test_ap_wps_ap_scan_2(dev, apdev): """AP_SCAN 2 for WPS""" ssid = "test-wps-conf" hapd = hostapd.add_ap(apdev[0], - { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "12345678", "wpa": "2", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"}) + {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"}) hapd.request("WPS_PBC") wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5') @@ -3551,18 +3612,32 @@ def test_ap_wps_ap_scan_2(dev, apdev): wpas.wait_connected(timeout=30) wpas.dump_monitor() wpas.request("DISCONNECT") + wpas.wait_disconnected() + id = wpas.list_networks()[0]['id'] + pairwise = wpas.get_network(id, "pairwise") + if "CCMP" not in pairwise.split(): + raise Exception("Unexpected pairwise parameter value: " + pairwise) + group = wpas.get_network(id, "group") + if "CCMP" not in group.split(): + raise Exception("Unexpected group parameter value: " + group) + # Need to select a single cipher for ap_scan=2 testing + wpas.set_network(id, "pairwise", "CCMP") + wpas.set_network(id, "group", "CCMP") wpas.request("BSS_FLUSH 0") wpas.dump_monitor() wpas.request("REASSOCIATE") wpas.wait_connected(timeout=30) wpas.dump_monitor() + wpas.request("DISCONNECT") + wpas.wait_disconnected() + wpas.flush_scan_cache() @remote_compatible def test_ap_wps_eapol_workaround(dev, apdev): """EAPOL workaround code path for 802.1X header length mismatch""" ssid = "test-wps" hapd = hostapd.add_ap(apdev[0], - { "ssid": ssid, "eap_server": "1", "wps_state": "1" }) + {"ssid": ssid, "eap_server": "1", "wps_state": "1"}) bssid = apdev[0]['bssid'] hapd.request("SET ext_eapol_frame_io 1") dev[0].request("SET ext_eapol_frame_io 1") @@ -3581,15 +3656,15 @@ def test_ap_wps_iteration(dev, apdev): """WPS PIN and iterate through APs without selected registrar""" ssid = "test-wps-conf" hapd = hostapd.add_ap(apdev[0], - { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "12345678", "wpa": "2", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"}) + {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"}) ssid2 = "test-wps-conf2" hapd2 = hostapd.add_ap(apdev[1], - { "ssid": ssid2, "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "12345678", "wpa": "2", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"}) + {"ssid": ssid2, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"}) dev[0].scan_for_bss(apdev[0]['bssid'], freq="2412") dev[0].scan_for_bss(apdev[1]['bssid'], freq="2412") @@ -3623,10 +3698,10 @@ def test_ap_wps_iteration_error(dev, apdev): """WPS AP iteration on no Selected Registrar and error case with an AP""" ssid = "test-wps-conf-pin" hapd = hostapd.add_ap(apdev[0], - { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "12345678", "wpa": "2", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", - "wps_independent": "1" }) + {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", + "wps_independent": "1"}) hapd.request("SET ext_eapol_frame_io 1") bssid = apdev[0]['bssid'] pin = dev[0].wps_read_pin() @@ -3649,10 +3724,10 @@ def test_ap_wps_iteration_error(dev, apdev): # Start the real target AP and activate registrar on it. hapd2 = hostapd.add_ap(apdev[1], - { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "12345678", "wpa": "2", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", - "wps_independent": "1" }) + {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", + "wps_independent": "1"}) hapd2.request("WPS_PIN any " + pin) dev[0].wait_disconnected(timeout=15) @@ -3669,9 +3744,9 @@ def test_ap_wps_priority(dev, apdev): """WPS PIN provisioning with configured AP and wps_priority""" ssid = "test-wps-conf-pin" hapd = hostapd.add_ap(apdev[0], - { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "12345678", "wpa": "2", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"}) + {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"}) logger.info("WPS provisioning step") pin = dev[0].wps_read_pin() hapd.request("WPS_PIN any " + pin) @@ -3691,10 +3766,10 @@ def test_ap_wps_priority(dev, apdev): @remote_compatible def test_ap_wps_and_non_wps(dev, apdev): """WPS and non-WPS AP in single hostapd process""" - params = { "ssid": "wps", "eap_server": "1", "wps_state": "1" } + params = {"ssid": "wps", "eap_server": "1", "wps_state": "1"} hapd = hostapd.add_ap(apdev[0], params) - params = { "ssid": "no wps" } + params = {"ssid": "no wps"} hapd2 = hostapd.add_ap(apdev[1], params) appin = hapd.request("WPS_AP_PIN random") @@ -3711,10 +3786,10 @@ def test_ap_wps_and_non_wps(dev, apdev): def test_ap_wps_init_oom(dev, apdev): """Initial AP configuration and OOM during PSK generation""" ssid = "test-wps" - params = { "ssid": ssid, "eap_server": "1", "wps_state": "1" } + params = {"ssid": ssid, "eap_server": "1", "wps_state": "1"} hapd = hostapd.add_ap(apdev[0], params) - with alloc_fail(hapd, 1, "base64_encode;wps_build_cred"): + with alloc_fail(hapd, 1, "base64_gen_encode;?base64_encode;wps_build_cred"): pin = dev[0].wps_read_pin() hapd.request("WPS_PIN any " + pin) dev[0].scan_for_bss(apdev[0]['bssid'], freq="2412") @@ -3739,19 +3814,20 @@ def _test_ap_wps_er_oom(dev, apdev): ap_pin = "12345670" ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e" hostapd.add_ap(apdev[0], - { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "12345678", "wpa": "2", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", - "device_name": "Wireless AP", "manufacturer": "Company", - "model_name": "WAP", "model_number": "123", - "serial_number": "12345", "device_type": "6-0050F204-1", - "os_version": "01020300", - "config_methods": "label push_button", - "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"}) + {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", + "device_name": "Wireless AP", "manufacturer": "Company", + "model_name": "WAP", "model_number": "123", + "serial_number": "12345", "device_type": "6-0050F204-1", + "os_version": "01020300", + "config_methods": "label push_button", + "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"}) dev[0].connect(ssid, psk="12345678", scan_freq="2412") - with alloc_fail(dev[0], 1, "base64_decode;xml_get_base64_item"): + with alloc_fail(dev[0], 1, + "base64_gen_decode;?base64_decode;xml_get_base64_item"): dev[0].request("WPS_ER_START ifname=lo") ev = dev[0].wait_event(["WPS-ER-AP-ADD"], timeout=3) if ev is not None: @@ -3764,7 +3840,8 @@ def _test_ap_wps_er_oom(dev, apdev): raise Exception("AP discovery timed out") dev[1].scan_for_bss(apdev[0]['bssid'], freq=2412) - with alloc_fail(dev[0], 1, "base64_decode;xml_get_base64_item"): + with alloc_fail(dev[0], 1, + "base64_gen_decode;?base64_decode;xml_get_base64_item"): dev[1].request("WPS_PBC " + apdev[0]['bssid']) ev = dev[1].wait_event(["CTRL-EVENT-SCAN-RESULTS"], timeout=10) if ev is None: @@ -3825,20 +3902,20 @@ def test_ap_wps_wpa_cli_action(dev, apdev, test_params): ssid = "test-wps-conf" hapd = hostapd.add_ap(apdev[0], - { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "12345678", "wpa": "2", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"}) + {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"}) prg = os.path.join(test_params['logdir'], 'alt-wpa_supplicant/wpa_supplicant/wpa_cli') if not os.path.exists(prg): prg = '../../wpa_supplicant/wpa_cli' - arg = [ prg, '-P', pidfile, '-B', '-i', dev[0].ifname, '-a', actionfile ] + arg = [prg, '-P', pidfile, '-B', '-i', dev[0].ifname, '-a', actionfile] subprocess.call(arg) - arg = [ 'ps', 'ax' ] + arg = ['ps', 'ax'] cmd = subprocess.Popen(arg, stdout=subprocess.PIPE) - out = cmd.communicate()[0] + out = cmd.communicate()[0].decode() cmd.wait() logger.debug("Processes:\n" + out) if "wpa_cli -P %s -B -i %s" % (pidfile, dev[0].ifname) not in out: @@ -3862,9 +3939,9 @@ def test_ap_wps_wpa_cli_action(dev, apdev, test_params): if "WPS-SUCCESS" not in res: raise Exception("WPS-SUCCESS event not seen in action file") - arg = [ 'ps', 'ax' ] + arg = ['ps', 'ax'] cmd = subprocess.Popen(arg, stdout=subprocess.PIPE) - out = cmd.communicate()[0] + out = cmd.communicate()[0].decode() cmd.wait() logger.debug("Remaining processes:\n" + out) if "wpa_cli -P %s -B -i %s" % (pidfile, dev[0].ifname) in out: @@ -3889,50 +3966,52 @@ def _test_ap_wps_er_ssdp_proto(dev, apdev): raise Exception("Invalid filter accepted") if "OK" not in dev[0].request("WPS_ER_START ifname=lo 1.2.3.4"): raise Exception("WPS_ER_START with filter failed") - (msg,addr) = sock.recvfrom(1000) + (msg, addr) = sock.recvfrom(1000) + msg = msg.decode() logger.debug("Received SSDP message from %s: %s" % (str(addr), msg)) if "M-SEARCH" not in msg: raise Exception("Not an M-SEARCH") - sock.sendto("FOO", addr) + sock.sendto(b"FOO", addr) time.sleep(0.1) dev[0].request("WPS_ER_STOP") dev[0].request("WPS_ER_START ifname=lo") - (msg,addr) = sock.recvfrom(1000) + (msg, addr) = sock.recvfrom(1000) + msg = msg.decode() logger.debug("Received SSDP message from %s: %s" % (str(addr), msg)) if "M-SEARCH" not in msg: raise Exception("Not an M-SEARCH") - sock.sendto("FOO", addr) - sock.sendto("HTTP/1.1 200 OK\r\nFOO\r\n\r\n", addr) - sock.sendto("HTTP/1.1 200 OK\r\nNTS:foo\r\n\r\n", addr) - sock.sendto("HTTP/1.1 200 OK\r\nNTS:ssdp:byebye\r\n\r\n", addr) - sock.sendto("HTTP/1.1 200 OK\r\ncache-control: foo=1\r\n\r\n", addr) - sock.sendto("HTTP/1.1 200 OK\r\ncache-control: max-age=1\r\n\r\n", addr) - sock.sendto("HTTP/1.1 200 OK\r\nusn:\r\n\r\n", addr) - sock.sendto("HTTP/1.1 200 OK\r\nusn:foo\r\n\r\n", addr) - sock.sendto("HTTP/1.1 200 OK\r\nusn: uuid:\r\n\r\n", addr) - sock.sendto("HTTP/1.1 200 OK\r\nusn: uuid: \r\n\r\n", addr) - sock.sendto("HTTP/1.1 200 OK\r\nusn: uuid: foo\r\n\r\n", addr) - sock.sendto("HTTP/1.1 200 OK\r\nST: urn:schemas-wifialliance-org:device:WFADevice:1\r\n\r\n", addr) - sock.sendto("HTTP/1.1 200 OK\r\nST: urn:schemas-wifialliance-org:device:WFADevice:1\r\nNTS:ssdp:byebye\r\n\r\n", addr) - sock.sendto("HTTP/1.1 200 OK\r\nST: urn:schemas-wifialliance-org:device:WFADevice:1\r\nlocation:foo\r\n\r\n", addr) + sock.sendto(b"FOO", addr) + sock.sendto(b"HTTP/1.1 200 OK\r\nFOO\r\n\r\n", addr) + sock.sendto(b"HTTP/1.1 200 OK\r\nNTS:foo\r\n\r\n", addr) + sock.sendto(b"HTTP/1.1 200 OK\r\nNTS:ssdp:byebye\r\n\r\n", addr) + sock.sendto(b"HTTP/1.1 200 OK\r\ncache-control: foo=1\r\n\r\n", addr) + sock.sendto(b"HTTP/1.1 200 OK\r\ncache-control: max-age=1\r\n\r\n", addr) + sock.sendto(b"HTTP/1.1 200 OK\r\nusn:\r\n\r\n", addr) + sock.sendto(b"HTTP/1.1 200 OK\r\nusn:foo\r\n\r\n", addr) + sock.sendto(b"HTTP/1.1 200 OK\r\nusn: uuid:\r\n\r\n", addr) + sock.sendto(b"HTTP/1.1 200 OK\r\nusn: uuid: \r\n\r\n", addr) + sock.sendto(b"HTTP/1.1 200 OK\r\nusn: uuid: foo\r\n\r\n", addr) + sock.sendto(b"HTTP/1.1 200 OK\r\nST: urn:schemas-wifialliance-org:device:WFADevice:1\r\n\r\n", addr) + sock.sendto(b"HTTP/1.1 200 OK\r\nST: urn:schemas-wifialliance-org:device:WFADevice:1\r\nNTS:ssdp:byebye\r\n\r\n", addr) + sock.sendto(b"HTTP/1.1 200 OK\r\nST: urn:schemas-wifialliance-org:device:WFADevice:1\r\nlocation:foo\r\n\r\n", addr) with alloc_fail(dev[0], 1, "wps_er_ap_add"): - sock.sendto("HTTP/1.1 200 OK\r\nST: urn:schemas-wifialliance-org:device:WFADevice:1\r\nlocation:foo\r\ncache-control:max-age=1\r\n\r\n", addr) + sock.sendto(b"HTTP/1.1 200 OK\r\nST: urn:schemas-wifialliance-org:device:WFADevice:1\r\nlocation:foo\r\ncache-control:max-age=1\r\n\r\n", addr) time.sleep(0.1) with alloc_fail(dev[0], 2, "wps_er_ap_add"): - sock.sendto("HTTP/1.1 200 OK\r\nST: urn:schemas-wifialliance-org:device:WFADevice:1\r\nlocation:foo\r\ncache-control:max-age=1\r\n\r\n", addr) + sock.sendto(b"HTTP/1.1 200 OK\r\nST: urn:schemas-wifialliance-org:device:WFADevice:1\r\nlocation:foo\r\ncache-control:max-age=1\r\n\r\n", addr) time.sleep(0.1) # Add an AP with bogus URL - sock.sendto("HTTP/1.1 200 OK\r\nST: urn:schemas-wifialliance-org:device:WFADevice:1\r\nlocation:foo\r\ncache-control:max-age=1\r\n\r\n", addr) + sock.sendto(b"HTTP/1.1 200 OK\r\nST: urn:schemas-wifialliance-org:device:WFADevice:1\r\nlocation:foo\r\ncache-control:max-age=1\r\n\r\n", addr) # Update timeout on AP without updating URL - sock.sendto("HTTP/1.1 200 OK\r\nST: urn:schemas-wifialliance-org:device:WFADevice:1\r\nlocation:http://127.0.0.1:12345/foo.xml\r\ncache-control:max-age=1\r\n\r\n", addr) + sock.sendto(b"HTTP/1.1 200 OK\r\nST: urn:schemas-wifialliance-org:device:WFADevice:1\r\nlocation:http://127.0.0.1:12345/foo.xml\r\ncache-control:max-age=1\r\n\r\n", addr) ev = dev[0].wait_event(["WPS-ER-AP-REMOVE"], timeout=5) if ev is None: raise Exception("No WPS-ER-AP-REMOVE event on max-age timeout") # Add an AP with a valid URL (but no server listing to it) - sock.sendto("HTTP/1.1 200 OK\r\nST: urn:schemas-wifialliance-org:device:WFADevice:1\r\nlocation:http://127.0.0.1:12345/foo.xml\r\ncache-control:max-age=1\r\n\r\n", addr) + sock.sendto(b"HTTP/1.1 200 OK\r\nST: urn:schemas-wifialliance-org:device:WFADevice:1\r\nlocation:http://127.0.0.1:12345/foo.xml\r\ncache-control:max-age=1\r\n\r\n", addr) ev = dev[0].wait_event(["WPS-ER-AP-REMOVE"], timeout=5) if ev is None: raise Exception("No WPS-ER-AP-REMOVE event on max-age timeout") @@ -3980,7 +4059,7 @@ def gen_upnp_info(eventSubURL='wps_event', controlURL='wps_control', 'Connection: close\r\n' + \ 'Content-Length: ' + str(len(payload)) + '\r\n' + \ 'Date: Sat, 15 Aug 2015 18:55:08 GMT\r\n\r\n' - return hdr + payload + return (hdr + payload).encode() def gen_wps_control(payload_override=None): payload = ''' @@ -4008,7 +4087,7 @@ AAYANyoAASA= 'Connection: close\r\n' + \ 'Content-Length: ' + str(len(payload)) + '\r\n' + \ 'Date: Sat, 15 Aug 2015 18:55:08 GMT\r\n\r\n' - return hdr + payload + return (hdr + payload).encode() def gen_wps_event(sid='uuid:7eb3342a-8a5f-47fe-a585-0785bfec6d8a'): payload = "" @@ -4021,14 +4100,14 @@ def gen_wps_event(sid='uuid:7eb3342a-8a5f-47fe-a585-0785bfec6d8a'): hdr += 'SID: ' + sid + '\r\n' hdr += 'Timeout: Second-1801\r\n' + \ 'Date: Sat, 15 Aug 2015 18:55:08 GMT\r\n\r\n' - return hdr + payload + return (hdr + payload).encode() -class WPSAPHTTPServer(SocketServer.StreamRequestHandler): +class WPSAPHTTPServer(StreamRequestHandler): def handle(self): - data = self.rfile.readline().strip() + data = self.rfile.readline().decode().strip() logger.info("HTTP server received: " + data) while True: - hdr = self.rfile.readline().strip() + hdr = self.rfile.readline().decode().strip() if len(hdr) == 0: break logger.info("HTTP header: " + hdr) @@ -4057,10 +4136,10 @@ class WPSAPHTTPServer(SocketServer.StreamRequestHandler): def handle_others(self, data): logger.info("Ignore HTTP request: " + data) -class MyTCPServer(SocketServer.TCPServer): +class MyTCPServer(TCPServer): def __init__(self, addr, handler): self.allow_reuse_address = True - SocketServer.TCPServer.__init__(self, addr, handler) + TCPServer.__init__(self, addr, handler) def wps_er_start(dev, http_server, max_age=1, wait_m_search=False, location_url=None): @@ -4070,7 +4149,8 @@ def wps_er_start(dev, http_server, max_age=1, wait_m_search=False, sock.bind(("239.255.255.250", 1900)) dev.request("WPS_ER_START ifname=lo") for i in range(100): - (msg,addr) = sock.recvfrom(1000) + (msg, addr) = sock.recvfrom(1000) + msg = msg.decode() logger.debug("Received SSDP message from %s: %s" % (str(addr), msg)) if "M-SEARCH" in msg: break @@ -4083,9 +4163,9 @@ def wps_er_start(dev, http_server, max_age=1, wait_m_search=False, server = MyTCPServer(("127.0.0.1", 12345), http_server) if not location_url: location_url = 'http://127.0.0.1:12345/foo.xml' - sock.sendto("HTTP/1.1 200 OK\r\nST: urn:schemas-wifialliance-org:device:WFADevice:1\r\nlocation:%s\r\ncache-control:max-age=%d\r\n\r\n" % (location_url, max_age), addr) + sock.sendto(("HTTP/1.1 200 OK\r\nST: urn:schemas-wifialliance-org:device:WFADevice:1\r\nlocation:%s\r\ncache-control:max-age=%d\r\n\r\n" % (location_url, max_age)).encode(), addr) server.timeout = 1 - return server,sock + return server, sock def wps_er_stop(dev, sock, server, on_alloc_fail=False): sock.close() @@ -4110,7 +4190,7 @@ def wps_er_stop(dev, sock, server, on_alloc_fail=False): def run_wps_er_proto_test(dev, handler, no_event_url=False, location_url=None): try: uuid = '27ea801a-9e5c-4e73-bd82-f89cbcd10d7e' - server,sock = wps_er_start(dev, handler, location_url=location_url) + server, sock = wps_er_start(dev, handler, location_url=location_url) global wps_event_url wps_event_url = None server.handle_request() @@ -4128,26 +4208,26 @@ def run_wps_er_proto_test(dev, handler, no_event_url=False, location_url=None): dev.request("WPS_ER_STOP") def send_wlanevent(url, uuid, data, no_response=False): - conn = httplib.HTTPConnection(url.netloc) + conn = HTTPConnection(url.netloc) payload = ''' 1 1 ''' - payload += base64.b64encode(data) + payload += base64.b64encode(data).decode() payload += '' - headers = { "Content-type": 'text/xml; charset="utf-8"', - "Server": "Unspecified, UPnP/1.0, Unspecified", - "HOST": url.netloc, - "NT": "upnp:event", - "SID": "uuid:" + uuid, - "SEQ": "0", - "Content-Length": str(len(payload)) } + headers = {"Content-type": 'text/xml; charset="utf-8"', + "Server": "Unspecified, UPnP/1.0, Unspecified", + "HOST": url.netloc, + "NT": "upnp:event", + "SID": "uuid:" + uuid, + "SEQ": "0", + "Content-Length": str(len(payload))} conn.request("NOTIFY", url.path, payload, headers) if no_response: try: conn.getresponse() - except Exception, e: + except Exception as e: pass return resp = conn.getresponse() @@ -4163,7 +4243,7 @@ def test_ap_wps_er_http_proto(dev, apdev): def _test_ap_wps_er_http_proto(dev, apdev): uuid = '27ea801a-9e5c-4e73-bd82-f89cbcd10d7e' - server,sock = wps_er_start(dev[0], WPSAPHTTPServer, max_age=15) + server, sock = wps_er_start(dev[0], WPSAPHTTPServer, max_age=15) global wps_event_url wps_event_url = None server.handle_request() @@ -4183,8 +4263,8 @@ def _test_ap_wps_er_http_proto(dev, apdev): sock.close() logger.info("Valid Probe Request notification") - url = urlparse.urlparse(wps_event_url) - conn = httplib.HTTPConnection(url.netloc) + url = urlparse(wps_event_url) + conn = HTTPConnection(url.netloc) payload = ''' 1 @@ -4195,13 +4275,13 @@ RGV2aWNlIEEQSQAGADcqAAEg ''' - headers = { "Content-type": 'text/xml; charset="utf-8"', - "Server": "Unspecified, UPnP/1.0, Unspecified", - "HOST": url.netloc, - "NT": "upnp:event", - "SID": "uuid:" + uuid, - "SEQ": "0", - "Content-Length": str(len(payload)) } + headers = {"Content-type": 'text/xml; charset="utf-8"', + "Server": "Unspecified, UPnP/1.0, Unspecified", + "HOST": url.netloc, + "NT": "upnp:event", + "SID": "uuid:" + uuid, + "SEQ": "0", + "Content-Length": str(len(payload))} conn.request("NOTIFY", url.path, payload, headers) resp = conn.getresponse() if resp.status != 200: @@ -4214,32 +4294,32 @@ RGV2aWNlIEEQSQAGADcqAAEg raise Exception("No Enrollee UUID match") logger.info("Incorrect event URL AP id") - conn = httplib.HTTPConnection(url.netloc) + conn = HTTPConnection(url.netloc) conn.request("NOTIFY", url.path + '123', payload, headers) resp = conn.getresponse() if resp.status != 404: raise Exception("Unexpected HTTP response: %d" % resp.status) logger.info("Missing AP id") - conn = httplib.HTTPConnection(url.netloc) + conn = HTTPConnection(url.netloc) conn.request("NOTIFY", '/event/' + url.path.split('/')[2], payload, headers) time.sleep(0.1) logger.info("Incorrect event URL event id") - conn = httplib.HTTPConnection(url.netloc) + conn = HTTPConnection(url.netloc) conn.request("NOTIFY", '/event/123456789/123', payload, headers) time.sleep(0.1) logger.info("Incorrect event URL prefix") - conn = httplib.HTTPConnection(url.netloc) + conn = HTTPConnection(url.netloc) conn.request("NOTIFY", '/foobar/123456789/123', payload, headers) resp = conn.getresponse() if resp.status != 404: raise Exception("Unexpected HTTP response: %d" % resp.status) logger.info("Unsupported request") - conn = httplib.HTTPConnection(url.netloc) + conn = HTTPConnection(url.netloc) conn.request("FOOBAR", '/foobar/123456789/123', payload, headers) resp = conn.getresponse() if resp.status != 501: @@ -4247,169 +4327,169 @@ RGV2aWNlIEEQSQAGADcqAAEg logger.info("Unsupported request and OOM") with alloc_fail(dev[0], 1, "wps_er_http_req"): - conn = httplib.HTTPConnection(url.netloc) + conn = HTTPConnection(url.netloc) conn.request("FOOBAR", '/foobar/123456789/123', payload, headers) time.sleep(0.5) logger.info("Too short WLANEvent") - data = '\x00' + data = b'\x00' send_wlanevent(url, uuid, data) logger.info("Invalid WLANEventMAC") - data = '\x00qwertyuiopasdfghjklzxcvbnm' + data = b'\x00qwertyuiopasdfghjklzxcvbnm' send_wlanevent(url, uuid, data) logger.info("Unknown WLANEventType") - data = '\xff02:00:00:00:00:00' + data = b'\xff02:00:00:00:00:00' send_wlanevent(url, uuid, data) logger.info("Probe Request notification without any attributes") - data = '\x0102:00:00:00:00:00' + data = b'\x0102:00:00:00:00:00' send_wlanevent(url, uuid, data) logger.info("Probe Request notification with invalid attribute") - data = '\x0102:00:00:00:00:00\xff' + data = b'\x0102:00:00:00:00:00\xff' send_wlanevent(url, uuid, data) logger.info("EAP message without any attributes") - data = '\x0202:00:00:00:00:00' + data = b'\x0202:00:00:00:00:00' send_wlanevent(url, uuid, data) logger.info("EAP message with invalid attribute") - data = '\x0202:00:00:00:00:00\xff' + data = b'\x0202:00:00:00:00:00\xff' send_wlanevent(url, uuid, data) logger.info("EAP message from new STA and not M1") - data = '\x0202:ff:ff:ff:ff:ff' + '\x10\x22\x00\x01\x05' + data = b'\x0202:ff:ff:ff:ff:ff' + b'\x10\x22\x00\x01\x05' send_wlanevent(url, uuid, data) logger.info("EAP message: M1") - data = '\x0202:00:00:00:00:00' - data += '\x10\x22\x00\x01\x04' - data += '\x10\x47\x00\x10' + 16*'\x00' - data += '\x10\x20\x00\x06\x02\x00\x00\x00\x00\x00' - data += '\x10\x1a\x00\x10' + 16*'\x00' - data += '\x10\x32\x00\xc0' + 192*'\x00' - data += '\x10\x04\x00\x02\x00\x00' - data += '\x10\x10\x00\x02\x00\x00' - data += '\x10\x0d\x00\x01\x00' - data += '\x10\x08\x00\x02\x00\x00' - data += '\x10\x44\x00\x01\x00' - data += '\x10\x21\x00\x00' - data += '\x10\x23\x00\x00' - data += '\x10\x24\x00\x00' - data += '\x10\x42\x00\x00' - data += '\x10\x54\x00\x08' + 8*'\x00' - data += '\x10\x11\x00\x00' - data += '\x10\x3c\x00\x01\x00' - data += '\x10\x02\x00\x02\x00\x00' - data += '\x10\x12\x00\x02\x00\x00' - data += '\x10\x09\x00\x02\x00\x00' - data += '\x10\x2d\x00\x04\x00\x00\x00\x00' + data = b'\x0202:00:00:00:00:00' + data += b'\x10\x22\x00\x01\x04' + data += b'\x10\x47\x00\x10' + 16 * b'\x00' + data += b'\x10\x20\x00\x06\x02\x00\x00\x00\x00\x00' + data += b'\x10\x1a\x00\x10' + 16 * b'\x00' + data += b'\x10\x32\x00\xc0' + 192 * b'\x00' + data += b'\x10\x04\x00\x02\x00\x00' + data += b'\x10\x10\x00\x02\x00\x00' + data += b'\x10\x0d\x00\x01\x00' + data += b'\x10\x08\x00\x02\x00\x00' + data += b'\x10\x44\x00\x01\x00' + data += b'\x10\x21\x00\x00' + data += b'\x10\x23\x00\x00' + data += b'\x10\x24\x00\x00' + data += b'\x10\x42\x00\x00' + data += b'\x10\x54\x00\x08' + 8 * b'\x00' + data += b'\x10\x11\x00\x00' + data += b'\x10\x3c\x00\x01\x00' + data += b'\x10\x02\x00\x02\x00\x00' + data += b'\x10\x12\x00\x02\x00\x00' + data += b'\x10\x09\x00\x02\x00\x00' + data += b'\x10\x2d\x00\x04\x00\x00\x00\x00' m1 = data send_wlanevent(url, uuid, data) logger.info("EAP message: WSC_ACK") - data = '\x0202:00:00:00:00:00' + '\x10\x22\x00\x01\x0d' + data = b'\x0202:00:00:00:00:00' + b'\x10\x22\x00\x01\x0d' send_wlanevent(url, uuid, data) logger.info("EAP message: M1") send_wlanevent(url, uuid, m1) logger.info("EAP message: WSC_NACK") - data = '\x0202:00:00:00:00:00' + '\x10\x22\x00\x01\x0e' + data = b'\x0202:00:00:00:00:00' + b'\x10\x22\x00\x01\x0e' send_wlanevent(url, uuid, data) logger.info("EAP message: M1 - Too long attribute values") - data = '\x0202:00:00:00:00:00' - data += '\x10\x11\x00\x21' + 33*'\x00' - data += '\x10\x45\x00\x21' + 33*'\x00' - data += '\x10\x42\x00\x21' + 33*'\x00' - data += '\x10\x24\x00\x21' + 33*'\x00' - data += '\x10\x23\x00\x21' + 33*'\x00' - data += '\x10\x21\x00\x41' + 65*'\x00' - data += '\x10\x49\x00\x09\x00\x37\x2a\x05\x02\x00\x00\x05\x00' + data = b'\x0202:00:00:00:00:00' + data += b'\x10\x11\x00\x21' + 33 * b'\x00' + data += b'\x10\x45\x00\x21' + 33 * b'\x00' + data += b'\x10\x42\x00\x21' + 33 * b'\x00' + data += b'\x10\x24\x00\x21' + 33 * b'\x00' + data += b'\x10\x23\x00\x21' + 33 * b'\x00' + data += b'\x10\x21\x00\x41' + 65 * b'\x00' + data += b'\x10\x49\x00\x09\x00\x37\x2a\x05\x02\x00\x00\x05\x00' send_wlanevent(url, uuid, data) logger.info("EAP message: M1 missing UUID-E") - data = '\x0202:00:00:00:00:00' - data += '\x10\x22\x00\x01\x04' + data = b'\x0202:00:00:00:00:00' + data += b'\x10\x22\x00\x01\x04' send_wlanevent(url, uuid, data) logger.info("EAP message: M1 missing MAC Address") - data += '\x10\x47\x00\x10' + 16*'\x00' + data += b'\x10\x47\x00\x10' + 16 * b'\x00' send_wlanevent(url, uuid, data) logger.info("EAP message: M1 missing Enrollee Nonce") - data += '\x10\x20\x00\x06\x02\x00\x00\x00\x00\x00' + data += b'\x10\x20\x00\x06\x02\x00\x00\x00\x00\x00' send_wlanevent(url, uuid, data) logger.info("EAP message: M1 missing Public Key") - data += '\x10\x1a\x00\x10' + 16*'\x00' + data += b'\x10\x1a\x00\x10' + 16 * b'\x00' send_wlanevent(url, uuid, data) logger.info("EAP message: M1 missing Authentication Type flags") - data += '\x10\x32\x00\xc0' + 192*'\x00' + data += b'\x10\x32\x00\xc0' + 192 * b'\x00' send_wlanevent(url, uuid, data) logger.info("EAP message: M1 missing Encryption Type Flags") - data += '\x10\x04\x00\x02\x00\x00' + data += b'\x10\x04\x00\x02\x00\x00' send_wlanevent(url, uuid, data) logger.info("EAP message: M1 missing Connection Type flags") - data += '\x10\x10\x00\x02\x00\x00' + data += b'\x10\x10\x00\x02\x00\x00' send_wlanevent(url, uuid, data) logger.info("EAP message: M1 missing Config Methods") - data += '\x10\x0d\x00\x01\x00' + data += b'\x10\x0d\x00\x01\x00' send_wlanevent(url, uuid, data) logger.info("EAP message: M1 missing Wi-Fi Protected Setup State") - data += '\x10\x08\x00\x02\x00\x00' + data += b'\x10\x08\x00\x02\x00\x00' send_wlanevent(url, uuid, data) logger.info("EAP message: M1 missing Manufacturer") - data += '\x10\x44\x00\x01\x00' + data += b'\x10\x44\x00\x01\x00' send_wlanevent(url, uuid, data) logger.info("EAP message: M1 missing Model Name") - data += '\x10\x21\x00\x00' + data += b'\x10\x21\x00\x00' send_wlanevent(url, uuid, data) logger.info("EAP message: M1 missing Model Number") - data += '\x10\x23\x00\x00' + data += b'\x10\x23\x00\x00' send_wlanevent(url, uuid, data) logger.info("EAP message: M1 missing Serial Number") - data += '\x10\x24\x00\x00' + data += b'\x10\x24\x00\x00' send_wlanevent(url, uuid, data) logger.info("EAP message: M1 missing Primary Device Type") - data += '\x10\x42\x00\x00' + data += b'\x10\x42\x00\x00' send_wlanevent(url, uuid, data) logger.info("EAP message: M1 missing Device Name") - data += '\x10\x54\x00\x08' + 8*'\x00' + data += b'\x10\x54\x00\x08' + 8 * b'\x00' send_wlanevent(url, uuid, data) logger.info("EAP message: M1 missing RF Bands") - data += '\x10\x11\x00\x00' + data += b'\x10\x11\x00\x00' send_wlanevent(url, uuid, data) logger.info("EAP message: M1 missing Association State") - data += '\x10\x3c\x00\x01\x00' + data += b'\x10\x3c\x00\x01\x00' send_wlanevent(url, uuid, data) logger.info("EAP message: M1 missing Device Password ID") - data += '\x10\x02\x00\x02\x00\x00' + data += b'\x10\x02\x00\x02\x00\x00' send_wlanevent(url, uuid, data) logger.info("EAP message: M1 missing Configuration Error") - data += '\x10\x12\x00\x02\x00\x00' + data += b'\x10\x12\x00\x02\x00\x00' send_wlanevent(url, uuid, data) logger.info("EAP message: M1 missing OS Version") - data += '\x10\x09\x00\x02\x00\x00' + data += b'\x10\x09\x00\x02\x00\x00' send_wlanevent(url, uuid, data) logger.info("Check max concurrent requests") @@ -4418,32 +4498,35 @@ RGV2aWNlIEEQSQAGADcqAAEg for i in range(20): socks[i] = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP) + socks[i].settimeout(10) socks[i].connect(addr) for i in range(20): - socks[i].send("GET / HTTP/1.1\r\n\r\n") + socks[i].send(b"GET / HTTP/1.1\r\n\r\n") count = 0 for i in range(20): try: - res = socks[i].recv(100) + res = socks[i].recv(100).decode() if "HTTP/1" in res: count += 1 + else: + logger.info("recv[%d]: len=%d" % (i, len(res))) except: pass socks[i].close() logger.info("%d concurrent HTTP GET operations returned response" % count) - if count < 10: + if count < 8: raise Exception("Too few concurrent HTTP connections accepted") logger.info("OOM in HTTP server") - for func in [ "http_request_init", "httpread_create", - "eloop_register_timeout;httpread_create", - "eloop_sock_table_add_sock;?eloop_register_sock;httpread_create", - "httpread_hdr_analyze" ]: + for func in ["http_request_init", "httpread_create", + "eloop_register_timeout;httpread_create", + "eloop_sock_table_add_sock;?eloop_register_sock;httpread_create", + "httpread_hdr_analyze"]: with alloc_fail(dev[0], 1, func): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP) sock.connect(addr) - sock.send("GET / HTTP/1.1\r\n\r\n") + sock.send(b"GET / HTTP/1.1\r\n\r\n") try: sock.recv(100) except: @@ -4451,22 +4534,22 @@ RGV2aWNlIEEQSQAGADcqAAEg sock.close() logger.info("Invalid HTTP header") - for req in [ " GET / HTTP/1.1\r\n\r\n", - "HTTP/1.1 200 OK\r\n\r\n", - "HTTP/\r\n\r\n", - "GET %%a%aa% HTTP/1.1\r\n\r\n", - "GET / HTTP/1.1\r\n FOO\r\n\r\n", - "NOTIFY / HTTP/1.1\r\n" + 4097*'a' + '\r\n\r\n', - "NOTIFY / HTTP/1.1\r\n\r\n" + 8193*'a', - "POST / HTTP/1.1\r\nTransfer-Encoding: CHUNKED\r\n\r\n foo\r\n", - "POST / HTTP/1.1\r\nTransfer-Encoding: CHUNKED\r\n\r\n1\r\nfoo\r\n", - "POST / HTTP/1.1\r\nTransfer-Encoding: CHUNKED\r\n\r\n0\r\n", - "POST / HTTP/1.1\r\nTransfer-Encoding: CHUNKED\r\n\r\n0\r\naa\ra\r\n\ra" ]: + for req in [" GET / HTTP/1.1\r\n\r\n", + "HTTP/1.1 200 OK\r\n\r\n", + "HTTP/\r\n\r\n", + "GET %%a%aa% HTTP/1.1\r\n\r\n", + "GET / HTTP/1.1\r\n FOO\r\n\r\n", + "NOTIFY / HTTP/1.1\r\n" + 4097*'a' + '\r\n\r\n', + "NOTIFY / HTTP/1.1\r\n\r\n" + 8193*'a', + "POST / HTTP/1.1\r\nTransfer-Encoding: CHUNKED\r\n\r\n foo\r\n", + "POST / HTTP/1.1\r\nTransfer-Encoding: CHUNKED\r\n\r\n1\r\nfoo\r\n", + "POST / HTTP/1.1\r\nTransfer-Encoding: CHUNKED\r\n\r\n0\r\n", + "POST / HTTP/1.1\r\nTransfer-Encoding: CHUNKED\r\n\r\n0\r\naa\ra\r\n\ra"]: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP) sock.settimeout(0.1) sock.connect(addr) - sock.send(req) + sock.send(req.encode()) try: sock.recv(100) except: @@ -4477,53 +4560,53 @@ RGV2aWNlIEEQSQAGADcqAAEg sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP) sock.connect(addr) - sock.send("NOTIFY / HTTP/1.1\r\n\r\n" + 4500*'a') + sock.send(b"NOTIFY / HTTP/1.1\r\n\r\n" + 4500 * b'a') try: sock.recv(100) except: pass sock.close() - conn = httplib.HTTPConnection(url.netloc) + conn = HTTPConnection(url.netloc) payload = '', - "NT": "upnp:event", - "timeout": "Second-1234" } + headers = {"callback": '', + "NT": "upnp:event", + "timeout": "Second-1234"} conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers) resp = conn.getresponse() if resp.status != 200: @@ -5892,18 +5977,18 @@ def test_ap_wps_set_selected_registrar_proto(dev, apdev): logger.debug("Subscription SID " + sid) server.handle_request() - tests = [ (500, "10"), - (200, "104a000110" + "1041000101" + "101200020000" + - "105300023148" + - "1049002c00372a0001200124111111111111222222222222333333333333444444444444555555555555666666666666" + - "10480010362db47ba53a519188fb5458b986b2e4"), - (200, "104a000110" + "1041000100" + "101200020000" + - "105300020000"), - (200, "104a000110" + "1041000100"), - (200, "104a000110") ] - for status,test in tests: + tests = [(500, "10"), + (200, "104a000110" + "1041000101" + "101200020000" + + "105300023148" + + "1049002c00372a0001200124111111111111222222222222333333333333444444444444555555555555666666666666" + + "10480010362db47ba53a519188fb5458b986b2e4"), + (200, "104a000110" + "1041000100" + "101200020000" + + "105300020000"), + (200, "104a000110" + "1041000100"), + (200, "104a000110")] + for status, test in tests: tlvs = binascii.unhexlify(test) - newmsg = base64.b64encode(tlvs) + newmsg = base64.b64encode(tlvs).decode() msg = '\n' msg += '' msg += '' @@ -5911,7 +5996,7 @@ def test_ap_wps_set_selected_registrar_proto(dev, apdev): msg += '' msg += newmsg msg += "" - headers = { "Content-type": 'text/xml; charset="utf-8"' } + headers = {"Content-type": 'text/xml; charset="utf-8"'} headers["SOAPAction"] = '"urn:schemas-wifialliance-org:service:WFAWLANConfig:1#%s"' % "SetSelectedRegistrar" conn.request("POST", ctrlurl.path, msg, headers) resp = conn.getresponse() @@ -6080,7 +6165,7 @@ def get_wsc_msg(dev): # Parse EAPOL header if len(data) < 4: raise Exception("No room for EAPOL header") - version,type,length = struct.unpack('>BBH', data[0:4]) + version, type, length = struct.unpack('>BBH', data[0:4]) msg['eapol_version'] = version msg['eapol_type'] = type msg['eapol_length'] = length @@ -6093,7 +6178,7 @@ def get_wsc_msg(dev): # Parse EAP header if len(data) < 4: raise Exception("No room for EAP header") - code,identifier,length = struct.unpack('>BBH', data[0:4]) + code, identifier, length = struct.unpack('>BBH', data[0:4]) msg['eap_code'] = code msg['eap_identifier'] = identifier msg['eap_length'] = length @@ -6104,13 +6189,13 @@ def get_wsc_msg(dev): # Parse EAP expanded header if len(data) < 1: raise Exception("No EAP type included") - msg['eap_type'], = struct.unpack('B', data[0]) + msg['eap_type'], = struct.unpack('B', data[0:1]) data = data[1:] if msg['eap_type'] == 254: if len(data) < 3 + 4: raise Exception("Truncated EAP expanded header") - msg['eap_vendor_id'], msg['eap_vendor_type'] = struct.unpack('>LL', '\0' + data[0:7]) + msg['eap_vendor_id'], msg['eap_vendor_type'] = struct.unpack('>LL', b'\x00' + data[0:7]) data = data[7:] else: raise Exception("Unexpected EAP type") @@ -6132,7 +6217,7 @@ def get_wsc_msg(dev): while len(data) > 0: if len(data) < 4: raise Exception("Truncated attribute header") - attr,length = struct.unpack('>HH', data[0:4]) + attr, length = struct.unpack('>HH', data[0:4]) data = data[4:] if length > len(data): raise Exception("Truncated attribute 0x%04x" % attr) @@ -6152,7 +6237,8 @@ def recv_wsc_msg(dev, opcode, msg_type): return msg, msg['wsc_attrs'], msg['raw_attrs'] def build_wsc_attr(attr, payload): - return struct.pack('>HH', attr, len(payload)) + payload + _payload = payload if type(payload) == bytes else payload.encode() + return struct.pack('>HH', attr, len(_payload)) + _payload def build_attr_msg_type(msg_type): return build_wsc_attr(ATTR_MSG_TYPE, struct.pack('B', msg_type)) @@ -6190,7 +6276,7 @@ def build_eap_failure(eap_id): return msg def send_wsc_msg(dev, src, msg): - res = dev.request("EAPOL_RX " + src + " " + binascii.hexlify(msg)) + res = dev.request("EAPOL_RX " + src + " " + binascii.hexlify(msg).decode()) if "OK" not in res: raise Exception("EAPOL_RX failed") @@ -6198,32 +6284,32 @@ group_5_prime = 0xFFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC group_5_generator = 2 def wsc_kdf(key, label, bits): - result = '' + result = b'' i = 1 while len(result) * 8 < bits: - data = struct.pack('>L', i) + label + struct.pack('>L', bits) + data = struct.pack('>L', i) + label.encode() + struct.pack('>L', bits) m = hmac.new(key, data, hashlib.sha256) result += m.digest() i += 1 - return result[0:bits / 8] + return result[0:bits // 8] def wsc_keys(kdk): keys = wsc_kdf(kdk, "Wi-Fi Easy and Secure Key Derivation", 640) authkey = keys[0:32] keywrapkey = keys[32:48] emsk = keys[48:80] - return authkey,keywrapkey,emsk + return authkey, keywrapkey, emsk def wsc_dev_pw_half_psk(authkey, dev_pw): - m = hmac.new(authkey, dev_pw, hashlib.sha256) + m = hmac.new(authkey, dev_pw.encode(), hashlib.sha256) return m.digest()[0:16] def wsc_dev_pw_psk(authkey, dev_pw): - dev_pw_1 = dev_pw[0:len(dev_pw) / 2] - dev_pw_2 = dev_pw[len(dev_pw) / 2:] + dev_pw_1 = dev_pw[0:len(dev_pw) // 2] + dev_pw_2 = dev_pw[len(dev_pw) // 2:] psk1 = wsc_dev_pw_half_psk(authkey, dev_pw_1) psk2 = wsc_dev_pw_half_psk(authkey, dev_pw_2) - return psk1,psk2 + return psk1, psk2 def build_attr_authenticator(authkey, prev_msg, curr_msg): m = hmac.new(authkey, prev_msg + curr_msg, hashlib.sha256) @@ -6234,7 +6320,7 @@ def build_attr_encr_settings(authkey, keywrapkey, data): m = hmac.new(authkey, data, hashlib.sha256) kwa = m.digest()[0:8] data += build_wsc_attr(ATTR_KEY_WRAP_AUTH, kwa) - iv = 16*'\x99' + iv = 16*b'\x99' aes = AES.new(keywrapkey, AES.MODE_CBC, iv) pad_len = 16 - len(data) % 16 ps = pad_len * struct.pack('B', pad_len) @@ -6249,18 +6335,18 @@ def decrypt_attr_encr_settings(authkey, keywrapkey, data): encr = data[16:] aes = AES.new(keywrapkey, AES.MODE_CBC, iv) decrypted = aes.decrypt(encr) - pad_len, = struct.unpack('B', decrypted[-1]) + pad_len, = struct.unpack('B', decrypted[-1:]) if pad_len > len(decrypted): raise Exception("Invalid padding in Encrypted Settings") for i in range(-pad_len, -1): if decrypted[i] != decrypted[-1]: raise Exception("Invalid PS value in Encrypted Settings") - + decrypted = decrypted[0:len(decrypted) - pad_len] if len(decrypted) < 12: raise Exception("Truncated Encrypted Settings plaintext") kwa = decrypted[-12:] - attr,length = struct.unpack(">HH", kwa[0:4]) + attr, length = struct.unpack(">HH", kwa[0:4]) if attr != ATTR_KEY_WRAP_AUTH or length != 8: raise Exception("Invalid KWA header") kwa = kwa[4:] @@ -6287,10 +6373,10 @@ def wsc_dh_init(): return own_private, pk def wsc_dh_kdf(peer_pk, own_private, mac_addr, e_nonce, r_nonce): - peer_public = long(binascii.hexlify(peer_pk), 16) + peer_public = int(binascii.hexlify(peer_pk), 16) if peer_public < 2 or peer_public >= group_5_prime: raise Exception("Invalid peer public key") - if pow(peer_public, (group_5_prime - 1) / 2, group_5_prime) != 1: + if pow(peer_public, (group_5_prime - 1) // 2, group_5_prime) != 1: raise Exception("Unexpected Legendre symbol for peer public key") shared_secret = pow(peer_public, own_private, group_5_prime) @@ -6298,34 +6384,34 @@ def wsc_dh_kdf(peer_pk, own_private, mac_addr, e_nonce, r_nonce): logger.debug("DH shared secret: " + ss) dhkey = hashlib.sha256(binascii.unhexlify(ss)).digest() - logger.debug("DHKey: " + binascii.hexlify(dhkey)) + logger.debug("DHKey: " + binascii.hexlify(dhkey).decode()) m = hmac.new(dhkey, e_nonce + mac_addr + r_nonce, hashlib.sha256) kdk = m.digest() - logger.debug("KDK: " + binascii.hexlify(kdk)) - authkey,keywrapkey,emsk = wsc_keys(kdk) - logger.debug("AuthKey: " + binascii.hexlify(authkey)) - logger.debug("KeyWrapKey: " + binascii.hexlify(keywrapkey)) - logger.debug("EMSK: " + binascii.hexlify(emsk)) - return authkey,keywrapkey + logger.debug("KDK: " + binascii.hexlify(kdk).decode()) + authkey, keywrapkey, emsk = wsc_keys(kdk) + logger.debug("AuthKey: " + binascii.hexlify(authkey).decode()) + logger.debug("KeyWrapKey: " + binascii.hexlify(keywrapkey).decode()) + logger.debug("EMSK: " + binascii.hexlify(emsk).decode()) + return authkey, keywrapkey def wsc_dev_pw_hash(authkey, dev_pw, e_pk, r_pk): - psk1,psk2 = wsc_dev_pw_psk(authkey, dev_pw) - logger.debug("PSK1: " + binascii.hexlify(psk1)) - logger.debug("PSK2: " + binascii.hexlify(psk2)) + psk1, psk2 = wsc_dev_pw_psk(authkey, dev_pw) + logger.debug("PSK1: " + binascii.hexlify(psk1).decode()) + logger.debug("PSK2: " + binascii.hexlify(psk2).decode()) # Note: Secret values are supposed to be random, but hardcoded values are # fine for testing. - s1 = 16*'\x77' + s1 = 16*b'\x77' m = hmac.new(authkey, s1 + psk1 + e_pk + r_pk, hashlib.sha256) hash1 = m.digest() - logger.debug("Hash1: " + binascii.hexlify(hash1)) + logger.debug("Hash1: " + binascii.hexlify(hash1).decode()) - s2 = 16*'\x88' + s2 = 16*b'\x88' m = hmac.new(authkey, s2 + psk2 + e_pk + r_pk, hashlib.sha256) hash2 = m.digest() - logger.debug("Hash2: " + binascii.hexlify(hash2)) - return s1,s2,hash1,hash2 + logger.debug("Hash2: " + binascii.hexlify(hash2).decode()) + return s1, s2, hash1, hash2 def build_m1(eap_id, uuid_e, mac_addr, e_nonce, e_pk, manufacturer='', model_name='', config_methods='\x00\x00'): @@ -6437,7 +6523,7 @@ def build_nack(eap_id, e_nonce, r_nonce, config_error='\x00\x00', def test_wps_ext(dev, apdev): """WPS against external implementation""" pin = "12345670" - addr,bssid,hapd = wps_start_ext(apdev[0], dev[0], pin=pin) + addr, bssid, hapd = wps_start_ext(apdev[0], dev[0], pin=pin) wps_ext_eap_identity_req(dev[0], hapd, bssid) wps_ext_eap_identity_resp(hapd, dev[0], addr) @@ -6448,8 +6534,8 @@ def test_wps_ext(dev, apdev): wsc_start_id = msg['eap_identifier'] mac_addr = binascii.unhexlify(dev[0].own_addr().replace(':', '')) - uuid_e = 16*'\x11' - e_nonce = 16*'\x22' + uuid_e = 16*b'\x11' + e_nonce = 16*b'\x22' own_private, e_pk = wsc_dh_init() logger.debug("Send M1 to AP") @@ -6460,11 +6546,11 @@ def test_wps_ext(dev, apdev): logger.debug("Receive M2 from AP") msg, m2_attrs, raw_m2_attrs = recv_wsc_msg(hapd, WSC_MSG, WPS_M2) - authkey,keywrapkey = wsc_dh_kdf(m2_attrs[ATTR_PUBLIC_KEY], own_private, - mac_addr, e_nonce, - m2_attrs[ATTR_REGISTRAR_NONCE]) - e_s1,e_s2,e_hash1,e_hash2 = wsc_dev_pw_hash(authkey, pin, e_pk, - m2_attrs[ATTR_PUBLIC_KEY]) + authkey, keywrapkey = wsc_dh_kdf(m2_attrs[ATTR_PUBLIC_KEY], own_private, + mac_addr, e_nonce, + m2_attrs[ATTR_REGISTRAR_NONCE]) + e_s1, e_s2, e_hash1, e_hash2 = wsc_dev_pw_hash(authkey, pin, e_pk, + m2_attrs[ATTR_PUBLIC_KEY]) logger.debug("Send M3 to AP") attrs = build_wsc_attr(ATTR_VERSION, '\x10') @@ -6512,7 +6598,7 @@ def test_wps_ext(dev, apdev): msg, m8_attrs, raw_m8_attrs = recv_wsc_msg(hapd, WSC_MSG, WPS_M8) m8_cred = decrypt_attr_encr_settings(authkey, keywrapkey, m8_attrs[ATTR_ENCR_SETTINGS]) - logger.debug("M8 Credential: " + binascii.hexlify(m8_cred)) + logger.debug("M8 Credential: " + binascii.hexlify(m8_cred).decode()) logger.debug("Prepare WSC_Done") attrs = build_wsc_attr(ATTR_VERSION, '\x10') @@ -6524,23 +6610,24 @@ def test_wps_ext(dev, apdev): # Do not send WSC_Done yet to allow exchangw with STA complete before the # AP disconnects. - uuid_r = 16*'\x33' - r_nonce = 16*'\x44' + uuid_r = 16*b'\x33' + r_nonce = 16*b'\x44' eap_id = wsc_start_id logger.debug("Send WSC/Start to STA") - wsc_start = build_eap_wsc(1, eap_id, "", opcode=WSC_Start) + wsc_start = build_eap_wsc(1, eap_id, b'', opcode=WSC_Start) send_wsc_msg(dev[0], bssid, wsc_start) eap_id = (eap_id + 1) % 256 logger.debug("Receive M1 from STA") msg, m1_attrs, raw_m1_attrs = recv_wsc_msg(dev[0], WSC_MSG, WPS_M1) - authkey,keywrapkey = wsc_dh_kdf(m1_attrs[ATTR_PUBLIC_KEY], own_private, - mac_addr, m1_attrs[ATTR_ENROLLEE_NONCE], - r_nonce) - r_s1,r_s2,r_hash1,r_hash2 = wsc_dev_pw_hash(authkey, pin, - m1_attrs[ATTR_PUBLIC_KEY], e_pk) + authkey, keywrapkey = wsc_dh_kdf(m1_attrs[ATTR_PUBLIC_KEY], own_private, + mac_addr, m1_attrs[ATTR_ENROLLEE_NONCE], + r_nonce) + r_s1, r_s2, r_hash1, r_hash2 = wsc_dev_pw_hash(authkey, pin, + m1_attrs[ATTR_PUBLIC_KEY], + e_pk) logger.debug("Send M2 to STA") m2, raw_m2_attrs = build_m2(authkey, raw_m1_attrs, eap_id, @@ -6619,25 +6706,26 @@ def test_wps_ext(dev, apdev): def wps_start_kwa(dev, apdev): pin = "12345670" - addr,bssid,hapd = wps_start_ext(apdev[0], dev[0], pin=pin) + addr, bssid, hapd = wps_start_ext(apdev[0], dev[0], pin=pin) wps_ext_eap_identity_req(dev[0], hapd, bssid) wps_ext_eap_identity_resp(hapd, dev[0], addr) wps_ext_eap_wsc(dev[0], hapd, bssid, "EAP-WSC/Start") mac_addr = binascii.unhexlify(dev[0].own_addr().replace(':', '')) - uuid_r = 16*'\x33' - r_nonce = 16*'\x44' + uuid_r = 16*b'\x33' + r_nonce = 16*b'\x44' own_private, e_pk = wsc_dh_init() logger.debug("Receive M1 from STA") msg, m1_attrs, raw_m1_attrs = recv_wsc_msg(dev[0], WSC_MSG, WPS_M1) eap_id = (msg['eap_identifier'] + 1) % 256 - authkey,keywrapkey = wsc_dh_kdf(m1_attrs[ATTR_PUBLIC_KEY], own_private, - mac_addr, m1_attrs[ATTR_ENROLLEE_NONCE], - r_nonce) - r_s1,r_s2,r_hash1,r_hash2 = wsc_dev_pw_hash(authkey, pin, - m1_attrs[ATTR_PUBLIC_KEY], e_pk) + authkey, keywrapkey = wsc_dh_kdf(m1_attrs[ATTR_PUBLIC_KEY], own_private, + mac_addr, m1_attrs[ATTR_ENROLLEE_NONCE], + r_nonce) + r_s1, r_s2, r_hash1, r_hash2 = wsc_dev_pw_hash(authkey, pin, + m1_attrs[ATTR_PUBLIC_KEY], + e_pk) logger.debug("Send M2 to STA") m2, raw_m2_attrs = build_m2(authkey, raw_m1_attrs, eap_id, @@ -6675,10 +6763,10 @@ def wps_stop_kwa(dev, bssid, attrs, authkey, raw_m3_attrs, eap_id): def test_wps_ext_kwa_proto_no_kwa(dev, apdev): """WPS and KWA error: No KWA attribute""" - r_s1,keywrapkey,authkey,raw_m3_attrs,eap_id,bssid,attrs = wps_start_kwa(dev, apdev) + r_s1, keywrapkey, authkey, raw_m3_attrs, eap_id, bssid, attrs = wps_start_kwa(dev, apdev) data = build_wsc_attr(ATTR_R_SNONCE1, r_s1) # Encrypted Settings without KWA - iv = 16*'\x99' + iv = 16*b'\x99' aes = AES.new(keywrapkey, AES.MODE_CBC, iv) pad_len = 16 - len(data) % 16 ps = pad_len * struct.pack('B', pad_len) @@ -6689,14 +6777,14 @@ def test_wps_ext_kwa_proto_no_kwa(dev, apdev): def test_wps_ext_kwa_proto_data_after_kwa(dev, apdev): """WPS and KWA error: Data after KWA""" - r_s1,keywrapkey,authkey,raw_m3_attrs,eap_id,bssid,attrs = wps_start_kwa(dev, apdev) + r_s1, keywrapkey, authkey, raw_m3_attrs, eap_id, bssid, attrs = wps_start_kwa(dev, apdev) data = build_wsc_attr(ATTR_R_SNONCE1, r_s1) # Encrypted Settings and data after KWA m = hmac.new(authkey, data, hashlib.sha256) kwa = m.digest()[0:8] data += build_wsc_attr(ATTR_KEY_WRAP_AUTH, kwa) data += build_wsc_attr(ATTR_VENDOR_EXT, "1234567890") - iv = 16*'\x99' + iv = 16*b'\x99' aes = AES.new(keywrapkey, AES.MODE_CBC, iv) pad_len = 16 - len(data) % 16 ps = pad_len * struct.pack('B', pad_len) @@ -6707,11 +6795,11 @@ def test_wps_ext_kwa_proto_data_after_kwa(dev, apdev): def test_wps_ext_kwa_proto_kwa_mismatch(dev, apdev): """WPS and KWA error: KWA mismatch""" - r_s1,keywrapkey,authkey,raw_m3_attrs,eap_id,bssid,attrs = wps_start_kwa(dev, apdev) + r_s1, keywrapkey, authkey, raw_m3_attrs, eap_id, bssid, attrs = wps_start_kwa(dev, apdev) data = build_wsc_attr(ATTR_R_SNONCE1, r_s1) # Encrypted Settings and KWA with incorrect value data += build_wsc_attr(ATTR_KEY_WRAP_AUTH, 8*'\x00') - iv = 16*'\x99' + iv = 16*b'\x99' aes = AES.new(keywrapkey, AES.MODE_CBC, iv) pad_len = 16 - len(data) % 16 ps = pad_len * struct.pack('B', pad_len) @@ -6722,25 +6810,26 @@ def test_wps_ext_kwa_proto_kwa_mismatch(dev, apdev): def wps_run_cred_proto(dev, apdev, m8_cred, connect=False, no_connect=False): pin = "12345670" - addr,bssid,hapd = wps_start_ext(apdev[0], dev[0], pin=pin) + addr, bssid, hapd = wps_start_ext(apdev[0], dev[0], pin=pin) wps_ext_eap_identity_req(dev[0], hapd, bssid) wps_ext_eap_identity_resp(hapd, dev[0], addr) wps_ext_eap_wsc(dev[0], hapd, bssid, "EAP-WSC/Start") mac_addr = binascii.unhexlify(dev[0].own_addr().replace(':', '')) - uuid_r = 16*'\x33' - r_nonce = 16*'\x44' + uuid_r = 16*b'\x33' + r_nonce = 16*b'\x44' own_private, e_pk = wsc_dh_init() logger.debug("Receive M1 from STA") msg, m1_attrs, raw_m1_attrs = recv_wsc_msg(dev[0], WSC_MSG, WPS_M1) eap_id = (msg['eap_identifier'] + 1) % 256 - authkey,keywrapkey = wsc_dh_kdf(m1_attrs[ATTR_PUBLIC_KEY], own_private, - mac_addr, m1_attrs[ATTR_ENROLLEE_NONCE], - r_nonce) - r_s1,r_s2,r_hash1,r_hash2 = wsc_dev_pw_hash(authkey, pin, - m1_attrs[ATTR_PUBLIC_KEY], e_pk) + authkey, keywrapkey = wsc_dh_kdf(m1_attrs[ATTR_PUBLIC_KEY], own_private, + mac_addr, m1_attrs[ATTR_ENROLLEE_NONCE], + r_nonce) + r_s1, r_s2, r_hash1, r_hash2 = wsc_dev_pw_hash(authkey, pin, + m1_attrs[ATTR_PUBLIC_KEY], + e_pk) logger.debug("Send M2 to STA") m2, raw_m2_attrs = build_m2(authkey, raw_m1_attrs, eap_id, @@ -6834,7 +6923,7 @@ def wps_run_cred_proto(dev, apdev, m8_cred, connect=False, no_connect=False): def build_cred(nw_idx='\x01', ssid='test-wps-conf', auth_type='\x00\x20', encr_type='\x00\x08', nw_key="12345678", mac_addr='\x00\x00\x00\x00\x00\x00'): - attrs = '' + attrs = b'' if nw_idx is not None: attrs += build_wsc_attr(ATTR_NETWORK_INDEX, nw_idx) if ssid is not None: @@ -6918,31 +7007,32 @@ def test_wps_ext_cred_proto_invalid_encr_type(dev, apdev): def test_wps_ext_cred_proto_missing_cred(dev, apdev): """WPS and Credential: Missing Credential""" mac_addr = binascii.unhexlify(dev[0].own_addr().replace(':', '')) - m8_cred = '' + m8_cred = b'' wps_run_cred_proto(dev, apdev, m8_cred) def test_wps_ext_proto_m2_no_public_key(dev, apdev): """WPS and no Public Key in M2""" pin = "12345670" - addr,bssid,hapd = wps_start_ext(apdev[0], dev[0], pin=pin) + addr, bssid, hapd = wps_start_ext(apdev[0], dev[0], pin=pin) wps_ext_eap_identity_req(dev[0], hapd, bssid) wps_ext_eap_identity_resp(hapd, dev[0], addr) wps_ext_eap_wsc(dev[0], hapd, bssid, "EAP-WSC/Start") mac_addr = binascii.unhexlify(dev[0].own_addr().replace(':', '')) - uuid_r = 16*'\x33' - r_nonce = 16*'\x44' + uuid_r = 16*b'\x33' + r_nonce = 16*b'\x44' own_private, e_pk = wsc_dh_init() logger.debug("Receive M1 from STA") msg, m1_attrs, raw_m1_attrs = recv_wsc_msg(dev[0], WSC_MSG, WPS_M1) eap_id = (msg['eap_identifier'] + 1) % 256 - authkey,keywrapkey = wsc_dh_kdf(m1_attrs[ATTR_PUBLIC_KEY], own_private, - mac_addr, m1_attrs[ATTR_ENROLLEE_NONCE], - r_nonce) - r_s1,r_s2,r_hash1,r_hash2 = wsc_dev_pw_hash(authkey, pin, - m1_attrs[ATTR_PUBLIC_KEY], e_pk) + authkey, keywrapkey = wsc_dh_kdf(m1_attrs[ATTR_PUBLIC_KEY], own_private, + mac_addr, m1_attrs[ATTR_ENROLLEE_NONCE], + r_nonce) + r_s1, r_s2, r_hash1, r_hash2 = wsc_dev_pw_hash(authkey, pin, + m1_attrs[ATTR_PUBLIC_KEY], + e_pk) logger.debug("Send M2 to STA") m2, raw_m2_attrs = build_m2(authkey, raw_m1_attrs, eap_id, @@ -6962,30 +7052,31 @@ def test_wps_ext_proto_m2_no_public_key(dev, apdev): def test_wps_ext_proto_m2_invalid_public_key(dev, apdev): """WPS and invalid Public Key in M2""" pin = "12345670" - addr,bssid,hapd = wps_start_ext(apdev[0], dev[0], pin=pin) + addr, bssid, hapd = wps_start_ext(apdev[0], dev[0], pin=pin) wps_ext_eap_identity_req(dev[0], hapd, bssid) wps_ext_eap_identity_resp(hapd, dev[0], addr) wps_ext_eap_wsc(dev[0], hapd, bssid, "EAP-WSC/Start") mac_addr = binascii.unhexlify(dev[0].own_addr().replace(':', '')) - uuid_r = 16*'\x33' - r_nonce = 16*'\x44' + uuid_r = 16*b'\x33' + r_nonce = 16*b'\x44' own_private, e_pk = wsc_dh_init() logger.debug("Receive M1 from STA") msg, m1_attrs, raw_m1_attrs = recv_wsc_msg(dev[0], WSC_MSG, WPS_M1) eap_id = (msg['eap_identifier'] + 1) % 256 - authkey,keywrapkey = wsc_dh_kdf(m1_attrs[ATTR_PUBLIC_KEY], own_private, - mac_addr, m1_attrs[ATTR_ENROLLEE_NONCE], - r_nonce) - r_s1,r_s2,r_hash1,r_hash2 = wsc_dev_pw_hash(authkey, pin, - m1_attrs[ATTR_PUBLIC_KEY], e_pk) + authkey, keywrapkey = wsc_dh_kdf(m1_attrs[ATTR_PUBLIC_KEY], own_private, + mac_addr, m1_attrs[ATTR_ENROLLEE_NONCE], + r_nonce) + r_s1, r_s2, r_hash1, r_hash2 = wsc_dev_pw_hash(authkey, pin, + m1_attrs[ATTR_PUBLIC_KEY], + e_pk) logger.debug("Send M2 to STA") m2, raw_m2_attrs = build_m2(authkey, raw_m1_attrs, eap_id, m1_attrs[ATTR_ENROLLEE_NONCE], - r_nonce, uuid_r, 192*'\xff') + r_nonce, uuid_r, 192*b'\xff') send_wsc_msg(dev[0], bssid, m2) eap_id = (eap_id + 1) % 256 @@ -7000,25 +7091,26 @@ def test_wps_ext_proto_m2_invalid_public_key(dev, apdev): def test_wps_ext_proto_m2_public_key_oom(dev, apdev): """WPS and Public Key OOM in M2""" pin = "12345670" - addr,bssid,hapd = wps_start_ext(apdev[0], dev[0], pin=pin) + addr, bssid, hapd = wps_start_ext(apdev[0], dev[0], pin=pin) wps_ext_eap_identity_req(dev[0], hapd, bssid) wps_ext_eap_identity_resp(hapd, dev[0], addr) wps_ext_eap_wsc(dev[0], hapd, bssid, "EAP-WSC/Start") mac_addr = binascii.unhexlify(dev[0].own_addr().replace(':', '')) - uuid_r = 16*'\x33' - r_nonce = 16*'\x44' + uuid_r = 16*b'\x33' + r_nonce = 16*b'\x44' own_private, e_pk = wsc_dh_init() logger.debug("Receive M1 from STA") msg, m1_attrs, raw_m1_attrs = recv_wsc_msg(dev[0], WSC_MSG, WPS_M1) eap_id = (msg['eap_identifier'] + 1) % 256 - authkey,keywrapkey = wsc_dh_kdf(m1_attrs[ATTR_PUBLIC_KEY], own_private, - mac_addr, m1_attrs[ATTR_ENROLLEE_NONCE], - r_nonce) - r_s1,r_s2,r_hash1,r_hash2 = wsc_dev_pw_hash(authkey, pin, - m1_attrs[ATTR_PUBLIC_KEY], e_pk) + authkey, keywrapkey = wsc_dh_kdf(m1_attrs[ATTR_PUBLIC_KEY], own_private, + mac_addr, m1_attrs[ATTR_ENROLLEE_NONCE], + r_nonce) + r_s1, r_s2, r_hash1, r_hash2 = wsc_dev_pw_hash(authkey, pin, + m1_attrs[ATTR_PUBLIC_KEY], + e_pk) logger.debug("Send M2 to STA") m2, raw_m2_attrs = build_m2(authkey, raw_m1_attrs, eap_id, @@ -7039,25 +7131,26 @@ def test_wps_ext_proto_m2_public_key_oom(dev, apdev): def test_wps_ext_proto_nack_m3(dev, apdev): """WPS and NACK M3""" pin = "12345670" - addr,bssid,hapd = wps_start_ext(apdev[0], dev[0], pin=pin) + addr, bssid, hapd = wps_start_ext(apdev[0], dev[0], pin=pin) wps_ext_eap_identity_req(dev[0], hapd, bssid) wps_ext_eap_identity_resp(hapd, dev[0], addr) wps_ext_eap_wsc(dev[0], hapd, bssid, "EAP-WSC/Start") mac_addr = binascii.unhexlify(dev[0].own_addr().replace(':', '')) - uuid_r = 16*'\x33' - r_nonce = 16*'\x44' + uuid_r = 16*b'\x33' + r_nonce = 16*b'\x44' own_private, e_pk = wsc_dh_init() logger.debug("Receive M1 from STA") msg, m1_attrs, raw_m1_attrs = recv_wsc_msg(dev[0], WSC_MSG, WPS_M1) eap_id = (msg['eap_identifier'] + 1) % 256 - authkey,keywrapkey = wsc_dh_kdf(m1_attrs[ATTR_PUBLIC_KEY], own_private, - mac_addr, m1_attrs[ATTR_ENROLLEE_NONCE], - r_nonce) - r_s1,r_s2,r_hash1,r_hash2 = wsc_dev_pw_hash(authkey, pin, - m1_attrs[ATTR_PUBLIC_KEY], e_pk) + authkey, keywrapkey = wsc_dh_kdf(m1_attrs[ATTR_PUBLIC_KEY], own_private, + mac_addr, m1_attrs[ATTR_ENROLLEE_NONCE], + r_nonce) + r_s1, r_s2, r_hash1, r_hash2 = wsc_dev_pw_hash(authkey, pin, + m1_attrs[ATTR_PUBLIC_KEY], + e_pk) logger.debug("Send M2 to STA") m2, raw_m2_attrs = build_m2(authkey, raw_m1_attrs, eap_id, @@ -7082,25 +7175,26 @@ def test_wps_ext_proto_nack_m3(dev, apdev): def test_wps_ext_proto_nack_m5(dev, apdev): """WPS and NACK M5""" pin = "12345670" - addr,bssid,hapd = wps_start_ext(apdev[0], dev[0], pin=pin) + addr, bssid, hapd = wps_start_ext(apdev[0], dev[0], pin=pin) wps_ext_eap_identity_req(dev[0], hapd, bssid) wps_ext_eap_identity_resp(hapd, dev[0], addr) wps_ext_eap_wsc(dev[0], hapd, bssid, "EAP-WSC/Start") mac_addr = binascii.unhexlify(dev[0].own_addr().replace(':', '')) - uuid_r = 16*'\x33' - r_nonce = 16*'\x44' + uuid_r = 16*b'\x33' + r_nonce = 16*b'\x44' own_private, e_pk = wsc_dh_init() logger.debug("Receive M1 from STA") msg, m1_attrs, raw_m1_attrs = recv_wsc_msg(dev[0], WSC_MSG, WPS_M1) eap_id = (msg['eap_identifier'] + 1) % 256 - authkey,keywrapkey = wsc_dh_kdf(m1_attrs[ATTR_PUBLIC_KEY], own_private, - mac_addr, m1_attrs[ATTR_ENROLLEE_NONCE], - r_nonce) - r_s1,r_s2,r_hash1,r_hash2 = wsc_dev_pw_hash(authkey, pin, - m1_attrs[ATTR_PUBLIC_KEY], e_pk) + authkey, keywrapkey = wsc_dh_kdf(m1_attrs[ATTR_PUBLIC_KEY], own_private, + mac_addr, m1_attrs[ATTR_ENROLLEE_NONCE], + r_nonce) + r_s1, r_s2, r_hash1, r_hash2 = wsc_dev_pw_hash(authkey, pin, + m1_attrs[ATTR_PUBLIC_KEY], + e_pk) logger.debug("Send M2 to STA") m2, raw_m2_attrs = build_m2(authkey, raw_m1_attrs, eap_id, @@ -7141,25 +7235,26 @@ def test_wps_ext_proto_nack_m5(dev, apdev): def wps_nack_m3(dev, apdev): pin = "00000000" - addr,bssid,hapd = wps_start_ext(apdev[0], dev[0], pbc=True) + addr, bssid, hapd = wps_start_ext(apdev[0], dev[0], pbc=True) wps_ext_eap_identity_req(dev[0], hapd, bssid) wps_ext_eap_identity_resp(hapd, dev[0], addr) wps_ext_eap_wsc(dev[0], hapd, bssid, "EAP-WSC/Start") mac_addr = binascii.unhexlify(dev[0].own_addr().replace(':', '')) - uuid_r = 16*'\x33' - r_nonce = 16*'\x44' + uuid_r = 16*b'\x33' + r_nonce = 16*b'\x44' own_private, e_pk = wsc_dh_init() logger.debug("Receive M1 from STA") msg, m1_attrs, raw_m1_attrs = recv_wsc_msg(dev[0], WSC_MSG, WPS_M1) eap_id = (msg['eap_identifier'] + 1) % 256 - authkey,keywrapkey = wsc_dh_kdf(m1_attrs[ATTR_PUBLIC_KEY], own_private, - mac_addr, m1_attrs[ATTR_ENROLLEE_NONCE], - r_nonce) - r_s1,r_s2,r_hash1,r_hash2 = wsc_dev_pw_hash(authkey, pin, - m1_attrs[ATTR_PUBLIC_KEY], e_pk) + authkey, keywrapkey = wsc_dh_kdf(m1_attrs[ATTR_PUBLIC_KEY], own_private, + mac_addr, m1_attrs[ATTR_ENROLLEE_NONCE], + r_nonce) + r_s1, r_s2, r_hash1, r_hash2 = wsc_dev_pw_hash(authkey, pin, + m1_attrs[ATTR_PUBLIC_KEY], + e_pk) logger.debug("Send M2 to STA") m2, raw_m2_attrs = build_m2(authkey, raw_m1_attrs, eap_id, @@ -7246,7 +7341,7 @@ def test_wps_ext_proto_nack_m3_invalid_attr(dev, apdev): """WPS and NACK M3 invalid attribute""" eap_id, e_nonce, r_nonce, bssid = wps_nack_m3(dev, apdev) logger.debug("Send NACK to STA") - attrs = '\x10\x10\x00' + attrs = b'\x10\x10\x00' msg = build_eap_wsc(1, eap_id, attrs, opcode=WSC_NACK) send_wsc_msg(dev[0], bssid, msg) dev[0].request("WPS_CANCEL") @@ -7317,7 +7412,7 @@ def test_wps_ext_proto_ack_m3_invalid_attr(dev, apdev): """WPS and ACK M3 invalid attribute""" eap_id, e_nonce, r_nonce, bssid = wps_nack_m3(dev, apdev) logger.debug("Send ACK to STA") - attrs = '\x10\x10\x00' + attrs = b'\x10\x10\x00' msg = build_eap_wsc(1, eap_id, attrs, opcode=WSC_ACK) send_wsc_msg(dev[0], bssid, msg) dev[0].request("WPS_CANCEL") @@ -7336,25 +7431,26 @@ def test_wps_ext_proto_ack_m3(dev, apdev): def wps_to_m3_helper(dev, apdev): pin = "12345670" - addr,bssid,hapd = wps_start_ext(apdev[0], dev[0], pin=pin) + addr, bssid, hapd = wps_start_ext(apdev[0], dev[0], pin=pin) wps_ext_eap_identity_req(dev[0], hapd, bssid) wps_ext_eap_identity_resp(hapd, dev[0], addr) wps_ext_eap_wsc(dev[0], hapd, bssid, "EAP-WSC/Start") mac_addr = binascii.unhexlify(dev[0].own_addr().replace(':', '')) - uuid_r = 16*'\x33' - r_nonce = 16*'\x44' + uuid_r = 16*b'\x33' + r_nonce = 16*b'\x44' own_private, e_pk = wsc_dh_init() logger.debug("Receive M1 from STA") msg, m1_attrs, raw_m1_attrs = recv_wsc_msg(dev[0], WSC_MSG, WPS_M1) eap_id = (msg['eap_identifier'] + 1) % 256 - authkey,keywrapkey = wsc_dh_kdf(m1_attrs[ATTR_PUBLIC_KEY], own_private, - mac_addr, m1_attrs[ATTR_ENROLLEE_NONCE], - r_nonce) - r_s1,r_s2,r_hash1,r_hash2 = wsc_dev_pw_hash(authkey, pin, - m1_attrs[ATTR_PUBLIC_KEY], e_pk) + authkey, keywrapkey = wsc_dh_kdf(m1_attrs[ATTR_PUBLIC_KEY], own_private, + mac_addr, m1_attrs[ATTR_ENROLLEE_NONCE], + r_nonce) + r_s1, r_s2, r_hash1, r_hash2 = wsc_dev_pw_hash(authkey, pin, + m1_attrs[ATTR_PUBLIC_KEY], + e_pk) logger.debug("Send M2 to STA") m2, raw_m2_attrs = build_m2(authkey, raw_m1_attrs, eap_id, @@ -7456,7 +7552,7 @@ def test_wps_ext_proto_m4_missing_r_snonce1(dev, apdev): attrs += build_wsc_attr(ATTR_R_HASH1, r_hash1) attrs += build_wsc_attr(ATTR_R_HASH2, r_hash2) #data = build_wsc_attr(ATTR_R_SNONCE1, r_s1) - data = '' + data = b'' attrs += build_attr_encr_settings(authkey, keywrapkey, data) attrs += build_attr_authenticator(authkey, m3, attrs) m4 = build_eap_wsc(1, eap_id, attrs) @@ -7487,7 +7583,7 @@ def test_wps_ext_proto_m4_invalid_pad_string(dev, apdev): m = hmac.new(authkey, data, hashlib.sha256) kwa = m.digest()[0:8] data += build_wsc_attr(ATTR_KEY_WRAP_AUTH, kwa) - iv = 16*'\x99' + iv = 16*b'\x99' aes = AES.new(keywrapkey, AES.MODE_CBC, iv) pad_len = 16 - len(data) % 16 ps = (pad_len - 1) * struct.pack('B', pad_len) + struct.pack('B', pad_len - 1) @@ -7524,7 +7620,7 @@ def test_wps_ext_proto_m4_invalid_pad_value(dev, apdev): m = hmac.new(authkey, data, hashlib.sha256) kwa = m.digest()[0:8] data += build_wsc_attr(ATTR_KEY_WRAP_AUTH, kwa) - iv = 16*'\x99' + iv = 16*b'\x99' aes = AES.new(keywrapkey, AES.MODE_CBC, iv) pad_len = 16 - len(data) % 16 ps = (pad_len - 1) * struct.pack('B', pad_len) + struct.pack('B', 255) @@ -7579,7 +7675,7 @@ def test_wps_ext_proto_m6_missing_r_snonce2(dev, apdev): attrs += build_attr_msg_type(WPS_M6) attrs += build_wsc_attr(ATTR_ENROLLEE_NONCE, e_nonce) #data = build_wsc_attr(ATTR_R_SNONCE2, r_s2) - data = '' + data = b'' attrs += build_attr_encr_settings(authkey, keywrapkey, data) attrs += build_attr_authenticator(authkey, m5, attrs) m6 = build_eap_wsc(1, eap_id, attrs) @@ -7662,10 +7758,10 @@ def wps_start_ext_reg(apdev, dev): bssid = apdev['bssid'] ssid = "test-wps-conf" appin = "12345670" - params = { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "12345678", "wpa": "2", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", - "ap_pin": appin } + params = {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", + "ap_pin": appin} hapd = hostapd.add_ap(apdev, params) dev.scan_for_bss(bssid, freq="2412") @@ -7674,10 +7770,10 @@ def wps_start_ext_reg(apdev, dev): dev.request("WPS_REG " + bssid + " " + appin) - return addr,bssid,hapd + return addr, bssid, hapd def wps_run_ap_settings_proto(dev, apdev, ap_settings, success): - addr,bssid,hapd = wps_start_ext_reg(apdev[0], dev[0]) + addr, bssid, hapd = wps_start_ext_reg(apdev[0], dev[0]) wps_ext_eap_identity_req(dev[0], hapd, bssid) wps_ext_eap_identity_resp(hapd, dev[0], addr) @@ -7688,12 +7784,12 @@ def wps_run_ap_settings_proto(dev, apdev, ap_settings, success): e_pk = m1_attrs[ATTR_PUBLIC_KEY] appin = '12345670' - uuid_r = 16*'\x33' - r_nonce = 16*'\x44' + uuid_r = 16*b'\x33' + r_nonce = 16*b'\x44' own_private, r_pk = wsc_dh_init() - authkey,keywrapkey = wsc_dh_kdf(e_pk, own_private, mac_addr, e_nonce, - r_nonce) - r_s1,r_s2,r_hash1,r_hash2 = wsc_dev_pw_hash(authkey, appin, e_pk, r_pk) + authkey, keywrapkey = wsc_dh_kdf(e_pk, own_private, mac_addr, e_nonce, + r_nonce) + r_s1, r_s2, r_hash1, r_hash2 = wsc_dev_pw_hash(authkey, appin, e_pk, r_pk) logger.debug("Send M2 to AP") m2, raw_m2_attrs = build_m2(authkey, raw_m1_attrs, msg['eap_identifier'], @@ -7754,8 +7850,8 @@ def wps_run_ap_settings_proto(dev, apdev, ap_settings, success): raise Exception("Unexpected message - expected WSC_Done") logger.debug("Send WSC_ACK to AP") - ack,attrs = build_ack(msg['eap_identifier'], e_nonce, r_nonce, - eap_code=2) + ack, attrs = build_ack(msg['eap_identifier'], e_nonce, r_nonce, + eap_code=2) send_wsc_msg(hapd, addr, ack) dev[0].wait_disconnected() else: @@ -7768,8 +7864,8 @@ def wps_run_ap_settings_proto(dev, apdev, ap_settings, success): raise Exception("Unexpected message - expected WSC_NACK") logger.debug("Send WSC_NACK to AP") - nack,attrs = build_nack(msg['eap_identifier'], e_nonce, r_nonce, - eap_code=2) + nack, attrs = build_nack(msg['eap_identifier'], e_nonce, r_nonce, + eap_code=2) send_wsc_msg(hapd, addr, nack) dev[0].wait_disconnected() @@ -7823,7 +7919,7 @@ def test_wps_ext_ap_settings_reject_encr_type(dev, apdev): @remote_compatible def test_wps_ext_ap_settings_m2d(dev, apdev): """WPS and AP Settings: M2D""" - addr,bssid,hapd = wps_start_ext_reg(apdev[0], dev[0]) + addr, bssid, hapd = wps_start_ext_reg(apdev[0], dev[0]) wps_ext_eap_identity_req(dev[0], hapd, bssid) wps_ext_eap_identity_resp(hapd, dev[0], addr) @@ -7853,8 +7949,8 @@ def wps_wait_ap_nack(hapd, dev, e_nonce, r_nonce): raise Exception("Unexpected message - expected WSC_NACK") logger.debug("Send WSC_NACK to AP") - nack,attrs = build_nack(msg['eap_identifier'], e_nonce, r_nonce, - eap_code=2) + nack, attrs = build_nack(msg['eap_identifier'], e_nonce, r_nonce, + eap_code=2) send_wsc_msg(hapd, dev.own_addr(), nack) dev.wait_disconnected() @@ -7862,7 +7958,7 @@ def wps_wait_ap_nack(hapd, dev, e_nonce, r_nonce): def test_wps_ext_m3_missing_e_hash1(dev, apdev): """WPS proto: M3 missing E-Hash1""" pin = "12345670" - addr,bssid,hapd = wps_start_ext(apdev[0], dev[0], pin=pin) + addr, bssid, hapd = wps_start_ext(apdev[0], dev[0], pin=pin) wps_ext_eap_identity_req(dev[0], hapd, bssid) wps_ext_eap_identity_resp(hapd, dev[0], addr) @@ -7872,8 +7968,8 @@ def test_wps_ext_m3_missing_e_hash1(dev, apdev): raise Exception("Unexpected Op-Code for WSC/Start") mac_addr = binascii.unhexlify(dev[0].own_addr().replace(':', '')) - uuid_e = 16*'\x11' - e_nonce = 16*'\x22' + uuid_e = 16*b'\x11' + e_nonce = 16*b'\x22' own_private, e_pk = wsc_dh_init() logger.debug("Send M1 to AP") @@ -7886,9 +7982,9 @@ def test_wps_ext_m3_missing_e_hash1(dev, apdev): r_nonce = m2_attrs[ATTR_REGISTRAR_NONCE] r_pk = m2_attrs[ATTR_PUBLIC_KEY] - authkey,keywrapkey = wsc_dh_kdf(r_pk, own_private, mac_addr, e_nonce, - r_nonce) - e_s1,e_s2,e_hash1,e_hash2 = wsc_dev_pw_hash(authkey, pin, e_pk, r_pk) + authkey, keywrapkey = wsc_dh_kdf(r_pk, own_private, mac_addr, e_nonce, + r_nonce) + e_s1, e_s2, e_hash1, e_hash2 = wsc_dev_pw_hash(authkey, pin, e_pk, r_pk) logger.debug("Send M3 to AP") attrs = build_wsc_attr(ATTR_VERSION, '\x10') @@ -7907,7 +8003,7 @@ def test_wps_ext_m3_missing_e_hash1(dev, apdev): def test_wps_ext_m3_missing_e_hash2(dev, apdev): """WPS proto: M3 missing E-Hash2""" pin = "12345670" - addr,bssid,hapd = wps_start_ext(apdev[0], dev[0], pin=pin) + addr, bssid, hapd = wps_start_ext(apdev[0], dev[0], pin=pin) wps_ext_eap_identity_req(dev[0], hapd, bssid) wps_ext_eap_identity_resp(hapd, dev[0], addr) @@ -7917,8 +8013,8 @@ def test_wps_ext_m3_missing_e_hash2(dev, apdev): raise Exception("Unexpected Op-Code for WSC/Start") mac_addr = binascii.unhexlify(dev[0].own_addr().replace(':', '')) - uuid_e = 16*'\x11' - e_nonce = 16*'\x22' + uuid_e = 16*b'\x11' + e_nonce = 16*b'\x22' own_private, e_pk = wsc_dh_init() logger.debug("Send M1 to AP") @@ -7931,9 +8027,9 @@ def test_wps_ext_m3_missing_e_hash2(dev, apdev): r_nonce = m2_attrs[ATTR_REGISTRAR_NONCE] r_pk = m2_attrs[ATTR_PUBLIC_KEY] - authkey,keywrapkey = wsc_dh_kdf(r_pk, own_private, mac_addr, e_nonce, - r_nonce) - e_s1,e_s2,e_hash1,e_hash2 = wsc_dev_pw_hash(authkey, pin, e_pk, r_pk) + authkey, keywrapkey = wsc_dh_kdf(r_pk, own_private, mac_addr, e_nonce, + r_nonce) + e_s1, e_s2, e_hash1, e_hash2 = wsc_dev_pw_hash(authkey, pin, e_pk, r_pk) logger.debug("Send M3 to AP") attrs = build_wsc_attr(ATTR_VERSION, '\x10') @@ -7952,7 +8048,7 @@ def test_wps_ext_m3_missing_e_hash2(dev, apdev): def test_wps_ext_m5_missing_e_snonce1(dev, apdev): """WPS proto: M5 missing E-SNonce1""" pin = "12345670" - addr,bssid,hapd = wps_start_ext(apdev[0], dev[0], pin=pin) + addr, bssid, hapd = wps_start_ext(apdev[0], dev[0], pin=pin) wps_ext_eap_identity_req(dev[0], hapd, bssid) wps_ext_eap_identity_resp(hapd, dev[0], addr) @@ -7962,8 +8058,8 @@ def test_wps_ext_m5_missing_e_snonce1(dev, apdev): raise Exception("Unexpected Op-Code for WSC/Start") mac_addr = binascii.unhexlify(dev[0].own_addr().replace(':', '')) - uuid_e = 16*'\x11' - e_nonce = 16*'\x22' + uuid_e = 16*b'\x11' + e_nonce = 16*b'\x22' own_private, e_pk = wsc_dh_init() logger.debug("Send M1 to AP") @@ -7976,9 +8072,9 @@ def test_wps_ext_m5_missing_e_snonce1(dev, apdev): r_nonce = m2_attrs[ATTR_REGISTRAR_NONCE] r_pk = m2_attrs[ATTR_PUBLIC_KEY] - authkey,keywrapkey = wsc_dh_kdf(r_pk, own_private, mac_addr, e_nonce, - r_nonce) - e_s1,e_s2,e_hash1,e_hash2 = wsc_dev_pw_hash(authkey, pin, e_pk, r_pk) + authkey, keywrapkey = wsc_dh_kdf(r_pk, own_private, mac_addr, e_nonce, + r_nonce) + e_s1, e_s2, e_hash1, e_hash2 = wsc_dev_pw_hash(authkey, pin, e_pk, r_pk) logger.debug("Send M3 to AP") attrs = build_wsc_attr(ATTR_VERSION, '\x10') @@ -7999,7 +8095,7 @@ def test_wps_ext_m5_missing_e_snonce1(dev, apdev): attrs += build_attr_msg_type(WPS_M5) attrs += build_wsc_attr(ATTR_REGISTRAR_NONCE, r_nonce) #data = build_wsc_attr(ATTR_E_SNONCE1, e_s1) - data = '' + data = b'' attrs += build_attr_encr_settings(authkey, keywrapkey, data) attrs += build_attr_authenticator(authkey, raw_m4_attrs, attrs) raw_m5_attrs = attrs @@ -8012,7 +8108,7 @@ def test_wps_ext_m5_missing_e_snonce1(dev, apdev): def test_wps_ext_m5_e_snonce1_mismatch(dev, apdev): """WPS proto: M5 E-SNonce1 mismatch""" pin = "12345670" - addr,bssid,hapd = wps_start_ext(apdev[0], dev[0], pin=pin) + addr, bssid, hapd = wps_start_ext(apdev[0], dev[0], pin=pin) wps_ext_eap_identity_req(dev[0], hapd, bssid) wps_ext_eap_identity_resp(hapd, dev[0], addr) @@ -8022,8 +8118,8 @@ def test_wps_ext_m5_e_snonce1_mismatch(dev, apdev): raise Exception("Unexpected Op-Code for WSC/Start") mac_addr = binascii.unhexlify(dev[0].own_addr().replace(':', '')) - uuid_e = 16*'\x11' - e_nonce = 16*'\x22' + uuid_e = 16*b'\x11' + e_nonce = 16*b'\x22' own_private, e_pk = wsc_dh_init() logger.debug("Send M1 to AP") @@ -8036,9 +8132,9 @@ def test_wps_ext_m5_e_snonce1_mismatch(dev, apdev): r_nonce = m2_attrs[ATTR_REGISTRAR_NONCE] r_pk = m2_attrs[ATTR_PUBLIC_KEY] - authkey,keywrapkey = wsc_dh_kdf(r_pk, own_private, mac_addr, e_nonce, - r_nonce) - e_s1,e_s2,e_hash1,e_hash2 = wsc_dev_pw_hash(authkey, pin, e_pk, r_pk) + authkey, keywrapkey = wsc_dh_kdf(r_pk, own_private, mac_addr, e_nonce, + r_nonce) + e_s1, e_s2, e_hash1, e_hash2 = wsc_dev_pw_hash(authkey, pin, e_pk, r_pk) logger.debug("Send M3 to AP") attrs = build_wsc_attr(ATTR_VERSION, '\x10') @@ -8070,7 +8166,7 @@ def test_wps_ext_m5_e_snonce1_mismatch(dev, apdev): def test_wps_ext_m7_missing_e_snonce2(dev, apdev): """WPS proto: M7 missing E-SNonce2""" pin = "12345670" - addr,bssid,hapd = wps_start_ext(apdev[0], dev[0], pin=pin) + addr, bssid, hapd = wps_start_ext(apdev[0], dev[0], pin=pin) wps_ext_eap_identity_req(dev[0], hapd, bssid) wps_ext_eap_identity_resp(hapd, dev[0], addr) @@ -8080,8 +8176,8 @@ def test_wps_ext_m7_missing_e_snonce2(dev, apdev): raise Exception("Unexpected Op-Code for WSC/Start") mac_addr = binascii.unhexlify(dev[0].own_addr().replace(':', '')) - uuid_e = 16*'\x11' - e_nonce = 16*'\x22' + uuid_e = 16*b'\x11' + e_nonce = 16*b'\x22' own_private, e_pk = wsc_dh_init() logger.debug("Send M1 to AP") @@ -8094,9 +8190,9 @@ def test_wps_ext_m7_missing_e_snonce2(dev, apdev): r_nonce = m2_attrs[ATTR_REGISTRAR_NONCE] r_pk = m2_attrs[ATTR_PUBLIC_KEY] - authkey,keywrapkey = wsc_dh_kdf(r_pk, own_private, mac_addr, e_nonce, - r_nonce) - e_s1,e_s2,e_hash1,e_hash2 = wsc_dev_pw_hash(authkey, pin, e_pk, r_pk) + authkey, keywrapkey = wsc_dh_kdf(r_pk, own_private, mac_addr, e_nonce, + r_nonce) + e_s1, e_s2, e_hash1, e_hash2 = wsc_dev_pw_hash(authkey, pin, e_pk, r_pk) logger.debug("Send M3 to AP") attrs = build_wsc_attr(ATTR_VERSION, '\x10') @@ -8131,7 +8227,7 @@ def test_wps_ext_m7_missing_e_snonce2(dev, apdev): attrs += build_attr_msg_type(WPS_M7) attrs += build_wsc_attr(ATTR_REGISTRAR_NONCE, r_nonce) #data = build_wsc_attr(ATTR_E_SNONCE2, e_s2) - data = '' + data = b'' attrs += build_attr_encr_settings(authkey, keywrapkey, data) attrs += build_attr_authenticator(authkey, raw_m6_attrs, attrs) m7 = build_eap_wsc(2, msg['eap_identifier'], attrs) @@ -8144,7 +8240,7 @@ def test_wps_ext_m7_missing_e_snonce2(dev, apdev): def test_wps_ext_m7_e_snonce2_mismatch(dev, apdev): """WPS proto: M7 E-SNonce2 mismatch""" pin = "12345670" - addr,bssid,hapd = wps_start_ext(apdev[0], dev[0], pin=pin) + addr, bssid, hapd = wps_start_ext(apdev[0], dev[0], pin=pin) wps_ext_eap_identity_req(dev[0], hapd, bssid) wps_ext_eap_identity_resp(hapd, dev[0], addr) @@ -8154,8 +8250,8 @@ def test_wps_ext_m7_e_snonce2_mismatch(dev, apdev): raise Exception("Unexpected Op-Code for WSC/Start") mac_addr = binascii.unhexlify(dev[0].own_addr().replace(':', '')) - uuid_e = 16*'\x11' - e_nonce = 16*'\x22' + uuid_e = 16*b'\x11' + e_nonce = 16*b'\x22' own_private, e_pk = wsc_dh_init() logger.debug("Send M1 to AP") @@ -8168,9 +8264,9 @@ def test_wps_ext_m7_e_snonce2_mismatch(dev, apdev): r_nonce = m2_attrs[ATTR_REGISTRAR_NONCE] r_pk = m2_attrs[ATTR_PUBLIC_KEY] - authkey,keywrapkey = wsc_dh_kdf(r_pk, own_private, mac_addr, e_nonce, - r_nonce) - e_s1,e_s2,e_hash1,e_hash2 = wsc_dev_pw_hash(authkey, pin, e_pk, r_pk) + authkey, keywrapkey = wsc_dh_kdf(r_pk, own_private, mac_addr, e_nonce, + r_nonce) + e_s1, e_s2, e_hash1, e_hash2 = wsc_dev_pw_hash(authkey, pin, e_pk, r_pk) logger.debug("Send M3 to AP") attrs = build_wsc_attr(ATTR_VERSION, '\x10') @@ -8217,7 +8313,7 @@ def test_wps_ext_m7_e_snonce2_mismatch(dev, apdev): def test_wps_ext_m1_pubkey_oom(dev, apdev): """WPS proto: M1 PubKey OOM""" pin = "12345670" - addr,bssid,hapd = wps_start_ext(apdev[0], dev[0], pin=pin) + addr, bssid, hapd = wps_start_ext(apdev[0], dev[0], pin=pin) wps_ext_eap_identity_req(dev[0], hapd, bssid) wps_ext_eap_identity_resp(hapd, dev[0], addr) @@ -8248,7 +8344,7 @@ def wps_wait_eap_failure(hapd, dev): def test_wps_ext_m3_m1(dev, apdev): """WPS proto: M3 replaced with M1""" pin = "12345670" - addr,bssid,hapd = wps_start_ext(apdev[0], dev[0], pin=pin) + addr, bssid, hapd = wps_start_ext(apdev[0], dev[0], pin=pin) wps_ext_eap_identity_req(dev[0], hapd, bssid) wps_ext_eap_identity_resp(hapd, dev[0], addr) @@ -8258,8 +8354,8 @@ def test_wps_ext_m3_m1(dev, apdev): raise Exception("Unexpected Op-Code for WSC/Start") mac_addr = binascii.unhexlify(dev[0].own_addr().replace(':', '')) - uuid_e = 16*'\x11' - e_nonce = 16*'\x22' + uuid_e = 16*b'\x11' + e_nonce = 16*b'\x22' own_private, e_pk = wsc_dh_init() logger.debug("Send M1 to AP") @@ -8272,9 +8368,9 @@ def test_wps_ext_m3_m1(dev, apdev): r_nonce = m2_attrs[ATTR_REGISTRAR_NONCE] r_pk = m2_attrs[ATTR_PUBLIC_KEY] - authkey,keywrapkey = wsc_dh_kdf(r_pk, own_private, mac_addr, e_nonce, - r_nonce) - e_s1,e_s2,e_hash1,e_hash2 = wsc_dev_pw_hash(authkey, pin, e_pk, r_pk) + authkey, keywrapkey = wsc_dh_kdf(r_pk, own_private, mac_addr, e_nonce, + r_nonce) + e_s1, e_s2, e_hash1, e_hash2 = wsc_dev_pw_hash(authkey, pin, e_pk, r_pk) logger.debug("Send M3(M1) to AP") attrs = build_wsc_attr(ATTR_VERSION, '\x10') @@ -8293,7 +8389,7 @@ def test_wps_ext_m3_m1(dev, apdev): def test_wps_ext_m5_m3(dev, apdev): """WPS proto: M5 replaced with M3""" pin = "12345670" - addr,bssid,hapd = wps_start_ext(apdev[0], dev[0], pin=pin) + addr, bssid, hapd = wps_start_ext(apdev[0], dev[0], pin=pin) wps_ext_eap_identity_req(dev[0], hapd, bssid) wps_ext_eap_identity_resp(hapd, dev[0], addr) @@ -8303,8 +8399,8 @@ def test_wps_ext_m5_m3(dev, apdev): raise Exception("Unexpected Op-Code for WSC/Start") mac_addr = binascii.unhexlify(dev[0].own_addr().replace(':', '')) - uuid_e = 16*'\x11' - e_nonce = 16*'\x22' + uuid_e = 16*b'\x11' + e_nonce = 16*b'\x22' own_private, e_pk = wsc_dh_init() logger.debug("Send M1 to AP") @@ -8317,9 +8413,9 @@ def test_wps_ext_m5_m3(dev, apdev): r_nonce = m2_attrs[ATTR_REGISTRAR_NONCE] r_pk = m2_attrs[ATTR_PUBLIC_KEY] - authkey,keywrapkey = wsc_dh_kdf(r_pk, own_private, mac_addr, e_nonce, - r_nonce) - e_s1,e_s2,e_hash1,e_hash2 = wsc_dev_pw_hash(authkey, pin, e_pk, r_pk) + authkey, keywrapkey = wsc_dh_kdf(r_pk, own_private, mac_addr, e_nonce, + r_nonce) + e_s1, e_s2, e_hash1, e_hash2 = wsc_dev_pw_hash(authkey, pin, e_pk, r_pk) logger.debug("Send M3 to AP") attrs = build_wsc_attr(ATTR_VERSION, '\x10') @@ -8352,7 +8448,7 @@ def test_wps_ext_m5_m3(dev, apdev): def test_wps_ext_m3_m2(dev, apdev): """WPS proto: M3 replaced with M2""" pin = "12345670" - addr,bssid,hapd = wps_start_ext(apdev[0], dev[0], pin=pin) + addr, bssid, hapd = wps_start_ext(apdev[0], dev[0], pin=pin) wps_ext_eap_identity_req(dev[0], hapd, bssid) wps_ext_eap_identity_resp(hapd, dev[0], addr) @@ -8362,8 +8458,8 @@ def test_wps_ext_m3_m2(dev, apdev): raise Exception("Unexpected Op-Code for WSC/Start") mac_addr = binascii.unhexlify(dev[0].own_addr().replace(':', '')) - uuid_e = 16*'\x11' - e_nonce = 16*'\x22' + uuid_e = 16*b'\x11' + e_nonce = 16*b'\x22' own_private, e_pk = wsc_dh_init() logger.debug("Send M1 to AP") @@ -8376,9 +8472,9 @@ def test_wps_ext_m3_m2(dev, apdev): r_nonce = m2_attrs[ATTR_REGISTRAR_NONCE] r_pk = m2_attrs[ATTR_PUBLIC_KEY] - authkey,keywrapkey = wsc_dh_kdf(r_pk, own_private, mac_addr, e_nonce, - r_nonce) - e_s1,e_s2,e_hash1,e_hash2 = wsc_dev_pw_hash(authkey, pin, e_pk, r_pk) + authkey, keywrapkey = wsc_dh_kdf(r_pk, own_private, mac_addr, e_nonce, + r_nonce) + e_s1, e_s2, e_hash1, e_hash2 = wsc_dev_pw_hash(authkey, pin, e_pk, r_pk) logger.debug("Send M3(M2) to AP") attrs = build_wsc_attr(ATTR_VERSION, '\x10') @@ -8395,7 +8491,7 @@ def test_wps_ext_m3_m2(dev, apdev): def test_wps_ext_m3_m5(dev, apdev): """WPS proto: M3 replaced with M5""" pin = "12345670" - addr,bssid,hapd = wps_start_ext(apdev[0], dev[0], pin=pin) + addr, bssid, hapd = wps_start_ext(apdev[0], dev[0], pin=pin) wps_ext_eap_identity_req(dev[0], hapd, bssid) wps_ext_eap_identity_resp(hapd, dev[0], addr) @@ -8405,8 +8501,8 @@ def test_wps_ext_m3_m5(dev, apdev): raise Exception("Unexpected Op-Code for WSC/Start") mac_addr = binascii.unhexlify(dev[0].own_addr().replace(':', '')) - uuid_e = 16*'\x11' - e_nonce = 16*'\x22' + uuid_e = 16*b'\x11' + e_nonce = 16*b'\x22' own_private, e_pk = wsc_dh_init() logger.debug("Send M1 to AP") @@ -8419,9 +8515,9 @@ def test_wps_ext_m3_m5(dev, apdev): r_nonce = m2_attrs[ATTR_REGISTRAR_NONCE] r_pk = m2_attrs[ATTR_PUBLIC_KEY] - authkey,keywrapkey = wsc_dh_kdf(r_pk, own_private, mac_addr, e_nonce, - r_nonce) - e_s1,e_s2,e_hash1,e_hash2 = wsc_dev_pw_hash(authkey, pin, e_pk, r_pk) + authkey, keywrapkey = wsc_dh_kdf(r_pk, own_private, mac_addr, e_nonce, + r_nonce) + e_s1, e_s2, e_hash1, e_hash2 = wsc_dev_pw_hash(authkey, pin, e_pk, r_pk) logger.debug("Send M3(M5) to AP") attrs = build_wsc_attr(ATTR_VERSION, '\x10') @@ -8440,7 +8536,7 @@ def test_wps_ext_m3_m5(dev, apdev): def test_wps_ext_m3_m7(dev, apdev): """WPS proto: M3 replaced with M7""" pin = "12345670" - addr,bssid,hapd = wps_start_ext(apdev[0], dev[0], pin=pin) + addr, bssid, hapd = wps_start_ext(apdev[0], dev[0], pin=pin) wps_ext_eap_identity_req(dev[0], hapd, bssid) wps_ext_eap_identity_resp(hapd, dev[0], addr) @@ -8450,8 +8546,8 @@ def test_wps_ext_m3_m7(dev, apdev): raise Exception("Unexpected Op-Code for WSC/Start") mac_addr = binascii.unhexlify(dev[0].own_addr().replace(':', '')) - uuid_e = 16*'\x11' - e_nonce = 16*'\x22' + uuid_e = 16*b'\x11' + e_nonce = 16*b'\x22' own_private, e_pk = wsc_dh_init() logger.debug("Send M1 to AP") @@ -8464,9 +8560,9 @@ def test_wps_ext_m3_m7(dev, apdev): r_nonce = m2_attrs[ATTR_REGISTRAR_NONCE] r_pk = m2_attrs[ATTR_PUBLIC_KEY] - authkey,keywrapkey = wsc_dh_kdf(r_pk, own_private, mac_addr, e_nonce, - r_nonce) - e_s1,e_s2,e_hash1,e_hash2 = wsc_dev_pw_hash(authkey, pin, e_pk, r_pk) + authkey, keywrapkey = wsc_dh_kdf(r_pk, own_private, mac_addr, e_nonce, + r_nonce) + e_s1, e_s2, e_hash1, e_hash2 = wsc_dev_pw_hash(authkey, pin, e_pk, r_pk) logger.debug("Send M3(M7) to AP") attrs = build_wsc_attr(ATTR_VERSION, '\x10') @@ -8485,7 +8581,7 @@ def test_wps_ext_m3_m7(dev, apdev): def test_wps_ext_m3_done(dev, apdev): """WPS proto: M3 replaced with WSC_Done""" pin = "12345670" - addr,bssid,hapd = wps_start_ext(apdev[0], dev[0], pin=pin) + addr, bssid, hapd = wps_start_ext(apdev[0], dev[0], pin=pin) wps_ext_eap_identity_req(dev[0], hapd, bssid) wps_ext_eap_identity_resp(hapd, dev[0], addr) @@ -8495,8 +8591,8 @@ def test_wps_ext_m3_done(dev, apdev): raise Exception("Unexpected Op-Code for WSC/Start") mac_addr = binascii.unhexlify(dev[0].own_addr().replace(':', '')) - uuid_e = 16*'\x11' - e_nonce = 16*'\x22' + uuid_e = 16*b'\x11' + e_nonce = 16*b'\x22' own_private, e_pk = wsc_dh_init() logger.debug("Send M1 to AP") @@ -8509,9 +8605,9 @@ def test_wps_ext_m3_done(dev, apdev): r_nonce = m2_attrs[ATTR_REGISTRAR_NONCE] r_pk = m2_attrs[ATTR_PUBLIC_KEY] - authkey,keywrapkey = wsc_dh_kdf(r_pk, own_private, mac_addr, e_nonce, - r_nonce) - e_s1,e_s2,e_hash1,e_hash2 = wsc_dev_pw_hash(authkey, pin, e_pk, r_pk) + authkey, keywrapkey = wsc_dh_kdf(r_pk, own_private, mac_addr, e_nonce, + r_nonce) + e_s1, e_s2, e_hash1, e_hash2 = wsc_dev_pw_hash(authkey, pin, e_pk, r_pk) logger.debug("Send M3(WSC_Done) to AP") attrs = build_wsc_attr(ATTR_VERSION, '\x10') @@ -8527,7 +8623,7 @@ def test_wps_ext_m3_done(dev, apdev): def test_wps_ext_m2_nack_invalid(dev, apdev): """WPS proto: M2 followed by invalid NACK""" pin = "12345670" - addr,bssid,hapd = wps_start_ext(apdev[0], dev[0], pin=pin) + addr, bssid, hapd = wps_start_ext(apdev[0], dev[0], pin=pin) wps_ext_eap_identity_req(dev[0], hapd, bssid) wps_ext_eap_identity_resp(hapd, dev[0], addr) @@ -8537,8 +8633,8 @@ def test_wps_ext_m2_nack_invalid(dev, apdev): raise Exception("Unexpected Op-Code for WSC/Start") mac_addr = binascii.unhexlify(dev[0].own_addr().replace(':', '')) - uuid_e = 16*'\x11' - e_nonce = 16*'\x22' + uuid_e = 16*b'\x11' + e_nonce = 16*b'\x22' own_private, e_pk = wsc_dh_init() logger.debug("Send M1 to AP") @@ -8551,12 +8647,12 @@ def test_wps_ext_m2_nack_invalid(dev, apdev): r_nonce = m2_attrs[ATTR_REGISTRAR_NONCE] r_pk = m2_attrs[ATTR_PUBLIC_KEY] - authkey,keywrapkey = wsc_dh_kdf(r_pk, own_private, mac_addr, e_nonce, - r_nonce) - e_s1,e_s2,e_hash1,e_hash2 = wsc_dev_pw_hash(authkey, pin, e_pk, r_pk) + authkey, keywrapkey = wsc_dh_kdf(r_pk, own_private, mac_addr, e_nonce, + r_nonce) + e_s1, e_s2, e_hash1, e_hash2 = wsc_dev_pw_hash(authkey, pin, e_pk, r_pk) logger.debug("Send WSC_NACK to AP") - attrs = '\x10\x00\x00' + attrs = b'\x10\x00\x00' nack = build_eap_wsc(2, msg['eap_identifier'], attrs, opcode=WSC_NACK) send_wsc_msg(hapd, addr, nack) @@ -8566,7 +8662,7 @@ def test_wps_ext_m2_nack_invalid(dev, apdev): def test_wps_ext_m2_nack_no_msg_type(dev, apdev): """WPS proto: M2 followed by NACK without Msg Type""" pin = "12345670" - addr,bssid,hapd = wps_start_ext(apdev[0], dev[0], pin=pin) + addr, bssid, hapd = wps_start_ext(apdev[0], dev[0], pin=pin) wps_ext_eap_identity_req(dev[0], hapd, bssid) wps_ext_eap_identity_resp(hapd, dev[0], addr) @@ -8576,8 +8672,8 @@ def test_wps_ext_m2_nack_no_msg_type(dev, apdev): raise Exception("Unexpected Op-Code for WSC/Start") mac_addr = binascii.unhexlify(dev[0].own_addr().replace(':', '')) - uuid_e = 16*'\x11' - e_nonce = 16*'\x22' + uuid_e = 16*b'\x11' + e_nonce = 16*b'\x22' own_private, e_pk = wsc_dh_init() logger.debug("Send M1 to AP") @@ -8590,13 +8686,13 @@ def test_wps_ext_m2_nack_no_msg_type(dev, apdev): r_nonce = m2_attrs[ATTR_REGISTRAR_NONCE] r_pk = m2_attrs[ATTR_PUBLIC_KEY] - authkey,keywrapkey = wsc_dh_kdf(r_pk, own_private, mac_addr, e_nonce, - r_nonce) - e_s1,e_s2,e_hash1,e_hash2 = wsc_dev_pw_hash(authkey, pin, e_pk, r_pk) + authkey, keywrapkey = wsc_dh_kdf(r_pk, own_private, mac_addr, e_nonce, + r_nonce) + e_s1, e_s2, e_hash1, e_hash2 = wsc_dev_pw_hash(authkey, pin, e_pk, r_pk) logger.debug("Send WSC_NACK to AP") - nack,attrs = build_nack(msg['eap_identifier'], e_nonce, r_nonce, - msg_type=None, eap_code=2) + nack, attrs = build_nack(msg['eap_identifier'], e_nonce, r_nonce, + msg_type=None, eap_code=2) send_wsc_msg(hapd, addr, nack) wps_wait_eap_failure(hapd, dev[0]) @@ -8605,7 +8701,7 @@ def test_wps_ext_m2_nack_no_msg_type(dev, apdev): def test_wps_ext_m2_nack_invalid_msg_type(dev, apdev): """WPS proto: M2 followed by NACK with invalid Msg Type""" pin = "12345670" - addr,bssid,hapd = wps_start_ext(apdev[0], dev[0], pin=pin) + addr, bssid, hapd = wps_start_ext(apdev[0], dev[0], pin=pin) wps_ext_eap_identity_req(dev[0], hapd, bssid) wps_ext_eap_identity_resp(hapd, dev[0], addr) @@ -8615,8 +8711,8 @@ def test_wps_ext_m2_nack_invalid_msg_type(dev, apdev): raise Exception("Unexpected Op-Code for WSC/Start") mac_addr = binascii.unhexlify(dev[0].own_addr().replace(':', '')) - uuid_e = 16*'\x11' - e_nonce = 16*'\x22' + uuid_e = 16*b'\x11' + e_nonce = 16*b'\x22' own_private, e_pk = wsc_dh_init() logger.debug("Send M1 to AP") @@ -8629,13 +8725,13 @@ def test_wps_ext_m2_nack_invalid_msg_type(dev, apdev): r_nonce = m2_attrs[ATTR_REGISTRAR_NONCE] r_pk = m2_attrs[ATTR_PUBLIC_KEY] - authkey,keywrapkey = wsc_dh_kdf(r_pk, own_private, mac_addr, e_nonce, - r_nonce) - e_s1,e_s2,e_hash1,e_hash2 = wsc_dev_pw_hash(authkey, pin, e_pk, r_pk) + authkey, keywrapkey = wsc_dh_kdf(r_pk, own_private, mac_addr, e_nonce, + r_nonce) + e_s1, e_s2, e_hash1, e_hash2 = wsc_dev_pw_hash(authkey, pin, e_pk, r_pk) logger.debug("Send WSC_NACK to AP") - nack,attrs = build_nack(msg['eap_identifier'], e_nonce, r_nonce, - msg_type=WPS_WSC_ACK, eap_code=2) + nack, attrs = build_nack(msg['eap_identifier'], e_nonce, r_nonce, + msg_type=WPS_WSC_ACK, eap_code=2) send_wsc_msg(hapd, addr, nack) wps_wait_eap_failure(hapd, dev[0]) @@ -8644,7 +8740,7 @@ def test_wps_ext_m2_nack_invalid_msg_type(dev, apdev): def test_wps_ext_m2_nack_e_nonce_mismatch(dev, apdev): """WPS proto: M2 followed by NACK with e-nonce mismatch""" pin = "12345670" - addr,bssid,hapd = wps_start_ext(apdev[0], dev[0], pin=pin) + addr, bssid, hapd = wps_start_ext(apdev[0], dev[0], pin=pin) wps_ext_eap_identity_req(dev[0], hapd, bssid) wps_ext_eap_identity_resp(hapd, dev[0], addr) @@ -8654,8 +8750,8 @@ def test_wps_ext_m2_nack_e_nonce_mismatch(dev, apdev): raise Exception("Unexpected Op-Code for WSC/Start") mac_addr = binascii.unhexlify(dev[0].own_addr().replace(':', '')) - uuid_e = 16*'\x11' - e_nonce = 16*'\x22' + uuid_e = 16*b'\x11' + e_nonce = 16*b'\x22' own_private, e_pk = wsc_dh_init() logger.debug("Send M1 to AP") @@ -8668,13 +8764,13 @@ def test_wps_ext_m2_nack_e_nonce_mismatch(dev, apdev): r_nonce = m2_attrs[ATTR_REGISTRAR_NONCE] r_pk = m2_attrs[ATTR_PUBLIC_KEY] - authkey,keywrapkey = wsc_dh_kdf(r_pk, own_private, mac_addr, e_nonce, - r_nonce) - e_s1,e_s2,e_hash1,e_hash2 = wsc_dev_pw_hash(authkey, pin, e_pk, r_pk) + authkey, keywrapkey = wsc_dh_kdf(r_pk, own_private, mac_addr, e_nonce, + r_nonce) + e_s1, e_s2, e_hash1, e_hash2 = wsc_dev_pw_hash(authkey, pin, e_pk, r_pk) logger.debug("Send WSC_NACK to AP") - nack,attrs = build_nack(msg['eap_identifier'], 16*'\x00', r_nonce, - eap_code=2) + nack, attrs = build_nack(msg['eap_identifier'], 16*b'\x00', r_nonce, + eap_code=2) send_wsc_msg(hapd, addr, nack) wps_wait_eap_failure(hapd, dev[0]) @@ -8683,7 +8779,7 @@ def test_wps_ext_m2_nack_e_nonce_mismatch(dev, apdev): def test_wps_ext_m2_nack_no_config_error(dev, apdev): """WPS proto: M2 followed by NACK without Config Error""" pin = "12345670" - addr,bssid,hapd = wps_start_ext(apdev[0], dev[0], pin=pin) + addr, bssid, hapd = wps_start_ext(apdev[0], dev[0], pin=pin) wps_ext_eap_identity_req(dev[0], hapd, bssid) wps_ext_eap_identity_resp(hapd, dev[0], addr) @@ -8693,8 +8789,8 @@ def test_wps_ext_m2_nack_no_config_error(dev, apdev): raise Exception("Unexpected Op-Code for WSC/Start") mac_addr = binascii.unhexlify(dev[0].own_addr().replace(':', '')) - uuid_e = 16*'\x11' - e_nonce = 16*'\x22' + uuid_e = 16*b'\x11' + e_nonce = 16*b'\x22' own_private, e_pk = wsc_dh_init() logger.debug("Send M1 to AP") @@ -8707,13 +8803,13 @@ def test_wps_ext_m2_nack_no_config_error(dev, apdev): r_nonce = m2_attrs[ATTR_REGISTRAR_NONCE] r_pk = m2_attrs[ATTR_PUBLIC_KEY] - authkey,keywrapkey = wsc_dh_kdf(r_pk, own_private, mac_addr, e_nonce, - r_nonce) - e_s1,e_s2,e_hash1,e_hash2 = wsc_dev_pw_hash(authkey, pin, e_pk, r_pk) + authkey, keywrapkey = wsc_dh_kdf(r_pk, own_private, mac_addr, e_nonce, + r_nonce) + e_s1, e_s2, e_hash1, e_hash2 = wsc_dev_pw_hash(authkey, pin, e_pk, r_pk) logger.debug("Send WSC_NACK to AP") - nack,attrs = build_nack(msg['eap_identifier'], e_nonce, r_nonce, - config_error=None, eap_code=2) + nack, attrs = build_nack(msg['eap_identifier'], e_nonce, r_nonce, + config_error=None, eap_code=2) send_wsc_msg(hapd, addr, nack) wps_wait_eap_failure(hapd, dev[0]) @@ -8722,7 +8818,7 @@ def test_wps_ext_m2_nack_no_config_error(dev, apdev): def test_wps_ext_m2_ack_invalid(dev, apdev): """WPS proto: M2 followed by invalid ACK""" pin = "12345670" - addr,bssid,hapd = wps_start_ext(apdev[0], dev[0], pin=pin) + addr, bssid, hapd = wps_start_ext(apdev[0], dev[0], pin=pin) wps_ext_eap_identity_req(dev[0], hapd, bssid) wps_ext_eap_identity_resp(hapd, dev[0], addr) @@ -8732,8 +8828,8 @@ def test_wps_ext_m2_ack_invalid(dev, apdev): raise Exception("Unexpected Op-Code for WSC/Start") mac_addr = binascii.unhexlify(dev[0].own_addr().replace(':', '')) - uuid_e = 16*'\x11' - e_nonce = 16*'\x22' + uuid_e = 16*b'\x11' + e_nonce = 16*b'\x22' own_private, e_pk = wsc_dh_init() logger.debug("Send M1 to AP") @@ -8746,12 +8842,12 @@ def test_wps_ext_m2_ack_invalid(dev, apdev): r_nonce = m2_attrs[ATTR_REGISTRAR_NONCE] r_pk = m2_attrs[ATTR_PUBLIC_KEY] - authkey,keywrapkey = wsc_dh_kdf(r_pk, own_private, mac_addr, e_nonce, - r_nonce) - e_s1,e_s2,e_hash1,e_hash2 = wsc_dev_pw_hash(authkey, pin, e_pk, r_pk) + authkey, keywrapkey = wsc_dh_kdf(r_pk, own_private, mac_addr, e_nonce, + r_nonce) + e_s1, e_s2, e_hash1, e_hash2 = wsc_dev_pw_hash(authkey, pin, e_pk, r_pk) logger.debug("Send WSC_ACK to AP") - attrs = '\x10\x00\x00' + attrs = b'\x10\x00\x00' ack = build_eap_wsc(2, msg['eap_identifier'], attrs, opcode=WSC_ACK) send_wsc_msg(hapd, addr, ack) @@ -8761,7 +8857,7 @@ def test_wps_ext_m2_ack_invalid(dev, apdev): def test_wps_ext_m2_ack(dev, apdev): """WPS proto: M2 followed by ACK""" pin = "12345670" - addr,bssid,hapd = wps_start_ext(apdev[0], dev[0], pin=pin) + addr, bssid, hapd = wps_start_ext(apdev[0], dev[0], pin=pin) wps_ext_eap_identity_req(dev[0], hapd, bssid) wps_ext_eap_identity_resp(hapd, dev[0], addr) @@ -8771,8 +8867,8 @@ def test_wps_ext_m2_ack(dev, apdev): raise Exception("Unexpected Op-Code for WSC/Start") mac_addr = binascii.unhexlify(dev[0].own_addr().replace(':', '')) - uuid_e = 16*'\x11' - e_nonce = 16*'\x22' + uuid_e = 16*b'\x11' + e_nonce = 16*b'\x22' own_private, e_pk = wsc_dh_init() logger.debug("Send M1 to AP") @@ -8785,12 +8881,12 @@ def test_wps_ext_m2_ack(dev, apdev): r_nonce = m2_attrs[ATTR_REGISTRAR_NONCE] r_pk = m2_attrs[ATTR_PUBLIC_KEY] - authkey,keywrapkey = wsc_dh_kdf(r_pk, own_private, mac_addr, e_nonce, - r_nonce) - e_s1,e_s2,e_hash1,e_hash2 = wsc_dev_pw_hash(authkey, pin, e_pk, r_pk) + authkey, keywrapkey = wsc_dh_kdf(r_pk, own_private, mac_addr, e_nonce, + r_nonce) + e_s1, e_s2, e_hash1, e_hash2 = wsc_dev_pw_hash(authkey, pin, e_pk, r_pk) logger.debug("Send WSC_ACK to AP") - ack,attrs = build_ack(msg['eap_identifier'], e_nonce, r_nonce, eap_code=2) + ack, attrs = build_ack(msg['eap_identifier'], e_nonce, r_nonce, eap_code=2) send_wsc_msg(hapd, addr, ack) wps_wait_eap_failure(hapd, dev[0]) @@ -8799,7 +8895,7 @@ def test_wps_ext_m2_ack(dev, apdev): def test_wps_ext_m2_ack_no_msg_type(dev, apdev): """WPS proto: M2 followed by ACK missing Msg Type""" pin = "12345670" - addr,bssid,hapd = wps_start_ext(apdev[0], dev[0], pin=pin) + addr, bssid, hapd = wps_start_ext(apdev[0], dev[0], pin=pin) wps_ext_eap_identity_req(dev[0], hapd, bssid) wps_ext_eap_identity_resp(hapd, dev[0], addr) @@ -8809,8 +8905,8 @@ def test_wps_ext_m2_ack_no_msg_type(dev, apdev): raise Exception("Unexpected Op-Code for WSC/Start") mac_addr = binascii.unhexlify(dev[0].own_addr().replace(':', '')) - uuid_e = 16*'\x11' - e_nonce = 16*'\x22' + uuid_e = 16*b'\x11' + e_nonce = 16*b'\x22' own_private, e_pk = wsc_dh_init() logger.debug("Send M1 to AP") @@ -8823,13 +8919,13 @@ def test_wps_ext_m2_ack_no_msg_type(dev, apdev): r_nonce = m2_attrs[ATTR_REGISTRAR_NONCE] r_pk = m2_attrs[ATTR_PUBLIC_KEY] - authkey,keywrapkey = wsc_dh_kdf(r_pk, own_private, mac_addr, e_nonce, - r_nonce) - e_s1,e_s2,e_hash1,e_hash2 = wsc_dev_pw_hash(authkey, pin, e_pk, r_pk) + authkey, keywrapkey = wsc_dh_kdf(r_pk, own_private, mac_addr, e_nonce, + r_nonce) + e_s1, e_s2, e_hash1, e_hash2 = wsc_dev_pw_hash(authkey, pin, e_pk, r_pk) logger.debug("Send WSC_ACK to AP") - ack,attrs = build_ack(msg['eap_identifier'], e_nonce, r_nonce, - msg_type=None, eap_code=2) + ack, attrs = build_ack(msg['eap_identifier'], e_nonce, r_nonce, + msg_type=None, eap_code=2) send_wsc_msg(hapd, addr, ack) wps_wait_eap_failure(hapd, dev[0]) @@ -8838,7 +8934,7 @@ def test_wps_ext_m2_ack_no_msg_type(dev, apdev): def test_wps_ext_m2_ack_invalid_msg_type(dev, apdev): """WPS proto: M2 followed by ACK with invalid Msg Type""" pin = "12345670" - addr,bssid,hapd = wps_start_ext(apdev[0], dev[0], pin=pin) + addr, bssid, hapd = wps_start_ext(apdev[0], dev[0], pin=pin) wps_ext_eap_identity_req(dev[0], hapd, bssid) wps_ext_eap_identity_resp(hapd, dev[0], addr) @@ -8848,8 +8944,8 @@ def test_wps_ext_m2_ack_invalid_msg_type(dev, apdev): raise Exception("Unexpected Op-Code for WSC/Start") mac_addr = binascii.unhexlify(dev[0].own_addr().replace(':', '')) - uuid_e = 16*'\x11' - e_nonce = 16*'\x22' + uuid_e = 16*b'\x11' + e_nonce = 16*b'\x22' own_private, e_pk = wsc_dh_init() logger.debug("Send M1 to AP") @@ -8862,12 +8958,12 @@ def test_wps_ext_m2_ack_invalid_msg_type(dev, apdev): r_nonce = m2_attrs[ATTR_REGISTRAR_NONCE] r_pk = m2_attrs[ATTR_PUBLIC_KEY] - authkey,keywrapkey = wsc_dh_kdf(r_pk, own_private, mac_addr, e_nonce, - r_nonce) - e_s1,e_s2,e_hash1,e_hash2 = wsc_dev_pw_hash(authkey, pin, e_pk, r_pk) + authkey, keywrapkey = wsc_dh_kdf(r_pk, own_private, mac_addr, e_nonce, + r_nonce) + e_s1, e_s2, e_hash1, e_hash2 = wsc_dev_pw_hash(authkey, pin, e_pk, r_pk) logger.debug("Send WSC_ACK to AP") - ack,attrs = build_ack(msg['eap_identifier'], e_nonce, r_nonce, + ack, attrs = build_ack(msg['eap_identifier'], e_nonce, r_nonce, msg_type=WPS_WSC_NACK, eap_code=2) send_wsc_msg(hapd, addr, ack) @@ -8877,7 +8973,7 @@ def test_wps_ext_m2_ack_invalid_msg_type(dev, apdev): def test_wps_ext_m2_ack_e_nonce_mismatch(dev, apdev): """WPS proto: M2 followed by ACK with e-nonce mismatch""" pin = "12345670" - addr,bssid,hapd = wps_start_ext(apdev[0], dev[0], pin=pin) + addr, bssid, hapd = wps_start_ext(apdev[0], dev[0], pin=pin) wps_ext_eap_identity_req(dev[0], hapd, bssid) wps_ext_eap_identity_resp(hapd, dev[0], addr) @@ -8887,8 +8983,8 @@ def test_wps_ext_m2_ack_e_nonce_mismatch(dev, apdev): raise Exception("Unexpected Op-Code for WSC/Start") mac_addr = binascii.unhexlify(dev[0].own_addr().replace(':', '')) - uuid_e = 16*'\x11' - e_nonce = 16*'\x22' + uuid_e = 16*b'\x11' + e_nonce = 16*b'\x22' own_private, e_pk = wsc_dh_init() logger.debug("Send M1 to AP") @@ -8901,13 +8997,13 @@ def test_wps_ext_m2_ack_e_nonce_mismatch(dev, apdev): r_nonce = m2_attrs[ATTR_REGISTRAR_NONCE] r_pk = m2_attrs[ATTR_PUBLIC_KEY] - authkey,keywrapkey = wsc_dh_kdf(r_pk, own_private, mac_addr, e_nonce, - r_nonce) - e_s1,e_s2,e_hash1,e_hash2 = wsc_dev_pw_hash(authkey, pin, e_pk, r_pk) + authkey, keywrapkey = wsc_dh_kdf(r_pk, own_private, mac_addr, e_nonce, + r_nonce) + e_s1, e_s2, e_hash1, e_hash2 = wsc_dev_pw_hash(authkey, pin, e_pk, r_pk) logger.debug("Send WSC_ACK to AP") - ack,attrs = build_ack(msg['eap_identifier'], 16*'\x00', r_nonce, - eap_code=2) + ack, attrs = build_ack(msg['eap_identifier'], 16*b'\x00', r_nonce, + eap_code=2) send_wsc_msg(hapd, addr, ack) wps_wait_eap_failure(hapd, dev[0]) @@ -8916,7 +9012,7 @@ def test_wps_ext_m2_ack_e_nonce_mismatch(dev, apdev): def test_wps_ext_m1_invalid(dev, apdev): """WPS proto: M1 failing parsing""" pin = "12345670" - addr,bssid,hapd = wps_start_ext(apdev[0], dev[0], pin=pin) + addr, bssid, hapd = wps_start_ext(apdev[0], dev[0], pin=pin) wps_ext_eap_identity_req(dev[0], hapd, bssid) wps_ext_eap_identity_resp(hapd, dev[0], addr) @@ -8926,7 +9022,7 @@ def test_wps_ext_m1_invalid(dev, apdev): raise Exception("Unexpected Op-Code for WSC/Start") logger.debug("Send M1 to AP") - attrs = '\x10\x00\x00' + attrs = b'\x10\x00\x00' m1 = build_eap_wsc(2, msg['eap_identifier'], attrs) send_wsc_msg(hapd, addr, m1) @@ -8935,7 +9031,7 @@ def test_wps_ext_m1_invalid(dev, apdev): def test_wps_ext_m1_missing_msg_type(dev, apdev): """WPS proto: M1 missing Msg Type""" pin = "12345670" - addr,bssid,hapd = wps_start_ext(apdev[0], dev[0], pin=pin) + addr, bssid, hapd = wps_start_ext(apdev[0], dev[0], pin=pin) wps_ext_eap_identity_req(dev[0], hapd, bssid) wps_ext_eap_identity_resp(hapd, dev[0], addr) @@ -8949,11 +9045,11 @@ def test_wps_ext_m1_missing_msg_type(dev, apdev): m1 = build_eap_wsc(2, msg['eap_identifier'], attrs) send_wsc_msg(hapd, addr, m1) - wps_wait_ap_nack(hapd, dev[0], 16*'\x00', 16*'\x00') + wps_wait_ap_nack(hapd, dev[0], 16*b'\x00', 16*b'\x00') def wps_ext_wsc_done(dev, apdev): pin = "12345670" - addr,bssid,hapd = wps_start_ext(apdev[0], dev[0], pin=pin) + addr, bssid, hapd = wps_start_ext(apdev[0], dev[0], pin=pin) wps_ext_eap_identity_req(dev[0], hapd, bssid) wps_ext_eap_identity_resp(hapd, dev[0], addr) @@ -8963,8 +9059,8 @@ def wps_ext_wsc_done(dev, apdev): raise Exception("Unexpected Op-Code for WSC/Start") mac_addr = binascii.unhexlify(dev[0].own_addr().replace(':', '')) - uuid_e = 16*'\x11' - e_nonce = 16*'\x22' + uuid_e = 16*b'\x11' + e_nonce = 16*b'\x22' own_private, e_pk = wsc_dh_init() logger.debug("Send M1 to AP") @@ -8977,9 +9073,9 @@ def wps_ext_wsc_done(dev, apdev): r_nonce = m2_attrs[ATTR_REGISTRAR_NONCE] r_pk = m2_attrs[ATTR_PUBLIC_KEY] - authkey,keywrapkey = wsc_dh_kdf(r_pk, own_private, mac_addr, e_nonce, - r_nonce) - e_s1,e_s2,e_hash1,e_hash2 = wsc_dev_pw_hash(authkey, pin, e_pk, r_pk) + authkey, keywrapkey = wsc_dh_kdf(r_pk, own_private, mac_addr, e_nonce, + r_nonce) + e_s1, e_s2, e_hash1, e_hash2 = wsc_dev_pw_hash(authkey, pin, e_pk, r_pk) logger.debug("Send M3 to AP") attrs = build_wsc_attr(ATTR_VERSION, '\x10') @@ -9030,7 +9126,7 @@ def test_wps_ext_wsc_done_invalid(dev, apdev): hapd, msg, e_nonce, r_nonce = wps_ext_wsc_done(dev, apdev) logger.debug("Send WSC_Done to AP") - attrs = '\x10\x00\x00' + attrs = b'\x10\x00\x00' wsc_done = build_eap_wsc(2, msg['eap_identifier'], attrs, opcode=WSC_Done) send_wsc_msg(hapd, dev[0].own_addr(), wsc_done) @@ -9099,7 +9195,7 @@ def test_wps_ext_wsc_done_no_r_nonce(dev, apdev): def test_wps_ext_m7_no_encr_settings(dev, apdev): """WPS proto: M7 without Encr Settings""" pin = "12345670" - addr,bssid,hapd = wps_start_ext(apdev[0], dev[0], pin=pin) + addr, bssid, hapd = wps_start_ext(apdev[0], dev[0], pin=pin) wps_ext_eap_identity_req(dev[0], hapd, bssid) wps_ext_eap_identity_resp(hapd, dev[0], addr) @@ -9109,8 +9205,8 @@ def test_wps_ext_m7_no_encr_settings(dev, apdev): raise Exception("Unexpected Op-Code for WSC/Start") mac_addr = binascii.unhexlify(dev[0].own_addr().replace(':', '')) - uuid_e = 16*'\x11' - e_nonce = 16*'\x22' + uuid_e = 16*b'\x11' + e_nonce = 16*b'\x22' own_private, e_pk = wsc_dh_init() logger.debug("Send M1 to AP") @@ -9123,9 +9219,9 @@ def test_wps_ext_m7_no_encr_settings(dev, apdev): r_nonce = m2_attrs[ATTR_REGISTRAR_NONCE] r_pk = m2_attrs[ATTR_PUBLIC_KEY] - authkey,keywrapkey = wsc_dh_kdf(r_pk, own_private, mac_addr, e_nonce, - r_nonce) - e_s1,e_s2,e_hash1,e_hash2 = wsc_dev_pw_hash(authkey, pin, e_pk, r_pk) + authkey, keywrapkey = wsc_dh_kdf(r_pk, own_private, mac_addr, e_nonce, + r_nonce) + e_s1, e_s2, e_hash1, e_hash2 = wsc_dev_pw_hash(authkey, pin, e_pk, r_pk) logger.debug("Send M3 to AP") attrs = build_wsc_attr(ATTR_VERSION, '\x10') @@ -9172,7 +9268,7 @@ def test_wps_ext_m7_no_encr_settings(dev, apdev): def test_wps_ext_m1_workaround(dev, apdev): """WPS proto: M1 Manufacturer/Model workaround""" pin = "12345670" - addr,bssid,hapd = wps_start_ext(apdev[0], dev[0], pin=pin) + addr, bssid, hapd = wps_start_ext(apdev[0], dev[0], pin=pin) wps_ext_eap_identity_req(dev[0], hapd, bssid) wps_ext_eap_identity_resp(hapd, dev[0], addr) @@ -9182,14 +9278,14 @@ def test_wps_ext_m1_workaround(dev, apdev): raise Exception("Unexpected Op-Code for WSC/Start") mac_addr = binascii.unhexlify(dev[0].own_addr().replace(':', '')) - uuid_e = 16*'\x11' - e_nonce = 16*'\x22' + uuid_e = 16*b'\x11' + e_nonce = 16*b'\x22' own_private, e_pk = wsc_dh_init() logger.debug("Send M1 to AP") m1, raw_m1_attrs = build_m1(msg['eap_identifier'], uuid_e, mac_addr, e_nonce, e_pk, manufacturer='Apple TEST', - model_name='AirPort', config_methods='\xff\xff') + model_name='AirPort', config_methods=b'\xff\xff') send_wsc_msg(hapd, addr, m1) logger.debug("Receive M2 from AP") @@ -9209,12 +9305,12 @@ def test_ap_wps_upnp_web_oom(dev, apdev, params): hapd = add_ssdp_ap(apdev[0], ap_uuid) location = ssdp_get_location(ap_uuid) - url = urlparse.urlparse(location) + url = urlparse(location) urls = upnp_get_urls(location) - eventurl = urlparse.urlparse(urls['event_sub_url']) - ctrlurl = urlparse.urlparse(urls['control_url']) + eventurl = urlparse(urls['event_sub_url']) + ctrlurl = urlparse(urls['control_url']) - conn = httplib.HTTPConnection(url.netloc) + conn = HTTPConnection(url.netloc) with alloc_fail(hapd, 1, "web_connection_parse_get"): conn.request("GET", "/wps_device.xml") try: @@ -9222,7 +9318,7 @@ def test_ap_wps_upnp_web_oom(dev, apdev, params): except: pass - conn = httplib.HTTPConnection(url.netloc) + conn = HTTPConnection(url.netloc) conn.request("GET", "/unknown") resp = conn.getresponse() if resp.status != 404: @@ -9232,60 +9328,60 @@ def test_ap_wps_upnp_web_oom(dev, apdev, params): conn.request("GET", "/unknown") try: resp = conn.getresponse() - print resp.status + print(resp.status) except: pass - conn = httplib.HTTPConnection(url.netloc) + conn = HTTPConnection(url.netloc) conn.request("GET", "/wps_device.xml") resp = conn.getresponse() if resp.status != 200: raise Exception("GET /wps_device.xml failed") - conn = httplib.HTTPConnection(url.netloc) + conn = HTTPConnection(url.netloc) resp = upnp_soap_action(conn, ctrlurl.path, "GetDeviceInfo") if resp.status != 200: raise Exception("GetDeviceInfo failed") with alloc_fail(hapd, 1, "web_process_get_device_info"): - conn = httplib.HTTPConnection(url.netloc) + conn = HTTPConnection(url.netloc) resp = upnp_soap_action(conn, ctrlurl.path, "GetDeviceInfo") if resp.status != 500: raise Exception("Internal error not reported from GetDeviceInfo OOM") with alloc_fail(hapd, 1, "wps_build_m1;web_process_get_device_info"): - conn = httplib.HTTPConnection(url.netloc) + conn = HTTPConnection(url.netloc) resp = upnp_soap_action(conn, ctrlurl.path, "GetDeviceInfo") if resp.status != 500: raise Exception("Internal error not reported from GetDeviceInfo OOM") with alloc_fail(hapd, 1, "wpabuf_alloc;web_connection_send_reply"): - conn = httplib.HTTPConnection(url.netloc) + conn = HTTPConnection(url.netloc) try: resp = upnp_soap_action(conn, ctrlurl.path, "GetDeviceInfo") except: pass - conn = httplib.HTTPConnection(url.netloc) + conn = HTTPConnection(url.netloc) resp = upnp_soap_action(conn, ctrlurl.path, "GetDeviceInfo") if resp.status != 200: raise Exception("GetDeviceInfo failed") # No NewWLANEventType in PutWLANResponse NewMessage - conn = httplib.HTTPConnection(url.netloc) + conn = HTTPConnection(url.netloc) resp = upnp_soap_action(conn, ctrlurl.path, "PutWLANResponse", newmsg="foo") if resp.status != 600: raise Exception("Unexpected HTTP response: %d" % resp.status) # No NewWLANEventMAC in PutWLANResponse NewMessage - conn = httplib.HTTPConnection(url.netloc) + conn = HTTPConnection(url.netloc) resp = upnp_soap_action(conn, ctrlurl.path, "PutWLANResponse", newmsg="foo", neweventtype="1") if resp.status != 600: raise Exception("Unexpected HTTP response: %d" % resp.status) # Invalid NewWLANEventMAC in PutWLANResponse NewMessage - conn = httplib.HTTPConnection(url.netloc) + conn = HTTPConnection(url.netloc) resp = upnp_soap_action(conn, ctrlurl.path, "PutWLANResponse", newmsg="foo", neweventtype="1", neweventmac="foo") @@ -9294,7 +9390,7 @@ def test_ap_wps_upnp_web_oom(dev, apdev, params): # Workaround for NewWLANEventMAC in PutWLANResponse NewMessage # Ignored unexpected PutWLANResponse WLANEventType 1 - conn = httplib.HTTPConnection(url.netloc) + conn = HTTPConnection(url.netloc) resp = upnp_soap_action(conn, ctrlurl.path, "PutWLANResponse", newmsg="foo", neweventtype="1", neweventmac="00.11.22.33.44.55") @@ -9302,7 +9398,7 @@ def test_ap_wps_upnp_web_oom(dev, apdev, params): raise Exception("Unexpected HTTP response: %d" % resp.status) # PutWLANResponse NewMessage with invalid EAP message - conn = httplib.HTTPConnection(url.netloc) + conn = HTTPConnection(url.netloc) resp = upnp_soap_action(conn, ctrlurl.path, "PutWLANResponse", newmsg="foo", neweventtype="2", neweventmac="00:11:22:33:44:55") @@ -9310,10 +9406,10 @@ def test_ap_wps_upnp_web_oom(dev, apdev, params): raise Exception("Unexpected HTTP response: %d" % resp.status) with alloc_fail(hapd, 1, "web_connection_parse_subscribe"): - conn = httplib.HTTPConnection(url.netloc) - headers = { "callback": '', - "NT": "upnp:event", - "timeout": "Second-1234" } + conn = HTTPConnection(url.netloc) + headers = {"callback": '', + "NT": "upnp:event", + "timeout": "Second-1234"} conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers) try: resp = conn.getresponse() @@ -9321,20 +9417,20 @@ def test_ap_wps_upnp_web_oom(dev, apdev, params): pass with alloc_fail(hapd, 1, "dup_binstr;web_connection_parse_subscribe"): - conn = httplib.HTTPConnection(url.netloc) - headers = { "callback": '', - "NT": "upnp:event", - "timeout": "Second-1234" } + conn = HTTPConnection(url.netloc) + headers = {"callback": '', + "NT": "upnp:event", + "timeout": "Second-1234"} conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers) resp = conn.getresponse() if resp.status != 500: raise Exception("Unexpected HTTP response: %d" % resp.status) with alloc_fail(hapd, 1, "wpabuf_alloc;web_connection_parse_unsubscribe"): - conn = httplib.HTTPConnection(url.netloc) - headers = { "callback": '', - "NT": "upnp:event", - "timeout": "Second-1234" } + conn = HTTPConnection(url.netloc) + headers = {"callback": '', + "NT": "upnp:event", + "timeout": "Second-1234"} conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers) try: resp = conn.getresponse() @@ -9342,7 +9438,7 @@ def test_ap_wps_upnp_web_oom(dev, apdev, params): pass with alloc_fail(hapd, 1, "web_connection_unimplemented"): - conn = httplib.HTTPConnection(url.netloc) + conn = HTTPConnection(url.netloc) conn.request("HEAD", "/wps_device.xml") try: resp = conn.getresponse() @@ -9370,10 +9466,10 @@ def test_ap_wps_eap_wsc_errors(dev, apdev): """WPS and EAP-WSC error cases""" ssid = "test-wps-conf-pin" appin = "12345670" - params = { "ssid": ssid, "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "12345678", "wpa": "2", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", - "fragment_size": "300", "ap_pin": appin } + params = {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", + "fragment_size": "300", "ap_pin": appin} hapd = hostapd.add_ap(apdev[0], params) bssid = apdev[0]['bssid'] @@ -9422,9 +9518,9 @@ def test_ap_wps_eap_wsc_errors(dev, apdev): wait_scan_stopped(dev[0]) dev[0].dump_monitor() - tests = [ "eap_wsc_init", - "eap_msg_alloc;eap_wsc_build_msg", - "wpabuf_alloc;eap_wsc_process_fragment" ] + tests = ["eap_wsc_init", + "eap_msg_alloc;eap_wsc_build_msg", + "wpabuf_alloc;eap_wsc_process_fragment"] for func in tests: with alloc_fail(dev[0], 1, func): dev[0].request("WPS_PIN %s %s" % (bssid, pin)) @@ -9434,6 +9530,18 @@ def test_ap_wps_eap_wsc_errors(dev, apdev): wait_scan_stopped(dev[0]) dev[0].dump_monitor() + tests = [(1, "wps_decrypt_encr_settings"), + (2, "hmac_sha256;wps_derive_psk")] + for count, func in tests: + hapd.request("WPS_PIN any " + pin) + with fail_test(dev[0], count, func): + dev[0].request("WPS_PIN %s %s" % (bssid, pin)) + wait_fail_trigger(dev[0], "GET_FAIL") + dev[0].request("WPS_CANCEL") + dev[0].wait_disconnected() + wait_scan_stopped(dev[0]) + dev[0].dump_monitor() + with alloc_fail(dev[0], 1, "eap_msg_alloc;eap_sm_build_expanded_nak"): dev[0].wps_reg(bssid, appin + " new_ssid=a", "new ssid", "WPA2PSK", "CCMP", "new passphrase", no_wait=True) @@ -9564,14 +9672,14 @@ def test_ap_wps_and_bss_limit(dev, apdev): pass def _test_ap_wps_and_bss_limit(dev, apdev): - params = { "ssid": "test-wps", "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "12345678", "wpa": "2", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP" } + params = {"ssid": "test-wps", "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"} hapd = hostapd.add_ap(apdev[0], params) - params = { "ssid": "test-wps-2", "eap_server": "1", "wps_state": "2", - "wpa_passphrase": "1234567890", "wpa": "2", - "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP" } + params = {"ssid": "test-wps-2", "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "1234567890", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"} hapd2 = hostapd.add_ap(apdev[1], params) id = dev[1].add_network() @@ -9627,3 +9735,499 @@ def _test_ap_wps_and_bss_limit(dev, apdev): dev[0].set_network(id, "key_mgmt", "WPS") dev[0].scan(freq="2412") + +def test_ap_wps_pbc_2ap(dev, apdev): + """WPS PBC with two APs advertising same SSID""" + params = {"ssid": "wps", "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", + "wps_independent": "1"} + hapd = hostapd.add_ap(apdev[0], params) + params = {"ssid": "wps", "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "123456789", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", + "wps_independent": "1"} + hapd2 = hostapd.add_ap(apdev[1], params) + hapd.request("WPS_PBC") + + wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5') + wpas.interface_add("wlan5", drv_params="force_connect_cmd=1") + wpas.dump_monitor() + wpas.flush_scan_cache() + + wpas.scan_for_bss(apdev[0]['bssid'], freq="2412", force_scan=True) + wpas.scan_for_bss(apdev[1]['bssid'], freq="2412") + wpas.request("WPS_PBC") + wpas.wait_connected() + wpas.request("DISCONNECT") + hapd.request("DISABLE") + hapd2.request("DISABLE") + wpas.flush_scan_cache() + +def test_ap_wps_er_enrollee_to_conf_ap(dev, apdev): + """WPS ER enrolling a new device to a configured AP""" + try: + _test_ap_wps_er_enrollee_to_conf_ap(dev, apdev) + finally: + dev[0].request("WPS_ER_STOP") + +def _test_ap_wps_er_enrollee_to_conf_ap(dev, apdev): + ssid = "wps-er-enrollee-to-conf-ap" + ap_pin = "12345670" + ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e" + params = {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", + "device_name": "Wireless AP", "manufacturer": "Company", + "model_name": "WAP", "model_number": "123", + "serial_number": "12345", "device_type": "6-0050F204-1", + "os_version": "01020300", + "config_methods": "label push_button", + "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"} + hapd = hostapd.add_ap(apdev[0], params) + bssid = hapd.own_addr() + + id = dev[0].connect(ssid, psk="12345678", scan_freq="2412") + dev[0].dump_monitor() + + dev[0].request("WPS_ER_START ifname=lo") + ev = dev[0].wait_event(["WPS-ER-AP-ADD"], timeout=15) + if ev is None: + raise Exception("AP discovery timed out") + if ap_uuid not in ev: + raise Exception("Expected AP UUID not found") + + pin = dev[2].wps_read_pin() + addr2 = dev[2].own_addr() + dev[0].dump_monitor() + dev[2].scan_for_bss(bssid, freq=2412) + dev[2].dump_monitor() + dev[2].request("WPS_PIN %s %s" % (bssid, pin)) + + for i in range(3): + ev = dev[0].wait_event(["WPS-ER-ENROLLEE-ADD"], timeout=10) + if ev is None: + raise Exception("Enrollee not seen") + if addr2 in ev: + break + if addr2 not in ev: + raise Exception("Unexpected Enrollee MAC address") + dev[0].dump_monitor() + + dev[0].request("WPS_ER_SET_CONFIG " + ap_uuid + " " + str(id)) + dev[0].request("WPS_ER_PIN " + addr2 + " " + pin + " " + addr2) + dev[2].wait_connected(timeout=30) + ev = dev[0].wait_event(["WPS-SUCCESS"], timeout=15) + if ev is None: + raise Exception("WPS ER did not report success") + +def test_ap_wps_er_enrollee_to_conf_ap2(dev, apdev): + """WPS ER enrolling a new device to a configured AP (2)""" + try: + _test_ap_wps_er_enrollee_to_conf_ap2(dev, apdev) + finally: + dev[0].request("WPS_ER_STOP") + +def _test_ap_wps_er_enrollee_to_conf_ap2(dev, apdev): + ssid = "wps-er-enrollee-to-conf-ap" + ap_pin = "12345670" + ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e" + params = {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", + "device_name": "Wireless AP", "manufacturer": "Company", + "model_name": "WAP", "model_number": "123", + "serial_number": "12345", "device_type": "6-0050F204-1", + "os_version": "01020300", + "config_methods": "label push_button", + "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"} + hapd = hostapd.add_ap(apdev[0], params) + bssid = hapd.own_addr() + + id = dev[0].connect(ssid, psk="12345678", scan_freq="2412") + dev[0].dump_monitor() + + dev[0].request("WPS_ER_START ifname=lo") + ev = dev[0].wait_event(["WPS-ER-AP-ADD"], timeout=15) + if ev is None: + raise Exception("AP discovery timed out") + if ap_uuid not in ev: + raise Exception("Expected AP UUID not found") + + dev[0].request("WPS_ER_LEARN " + ap_uuid + " " + ap_pin) + ev = dev[0].wait_event(["WPS-ER-AP-SETTINGS"], timeout=15) + if ev is None: + raise Exception("AP learn timed out") + if ap_uuid not in ev: + raise Exception("Expected AP UUID not in settings") + ev = dev[0].wait_event(["WPS-FAIL"], timeout=15) + if ev is None: + raise Exception("WPS-FAIL after AP learn timed out") + time.sleep(0.1) + + pin = dev[1].wps_read_pin() + addr1 = dev[1].own_addr() + dev[0].dump_monitor() + dev[0].request("WPS_ER_PIN any " + pin) + time.sleep(0.1) + dev[1].scan_for_bss(bssid, freq=2412) + dev[1].request("WPS_PIN any %s" % pin) + ev = dev[1].wait_event(["WPS-SUCCESS"], timeout=30) + if ev is None: + raise Exception("Enrollee did not report success") + dev[1].wait_connected(timeout=15) + ev = dev[0].wait_event(["WPS-SUCCESS"], timeout=15) + if ev is None: + raise Exception("WPS ER did not report success") + +def test_ap_wps_ignore_broadcast_ssid(dev, apdev): + """WPS AP trying to ignore broadcast SSID""" + ssid = "test-wps" + hapd = hostapd.add_ap(apdev[0], + {"ssid": ssid, "eap_server": "1", "wps_state": "1", + "ignore_broadcast_ssid": "1"}) + if "FAIL" not in hapd.request("WPS_PBC"): + raise Exception("WPS unexpectedly enabled") + +def test_ap_wps_wep(dev, apdev): + """WPS AP trying to enable WEP""" + ssid = "test-wps" + hapd = hostapd.add_ap(apdev[0], + {"ssid": ssid, "eap_server": "1", "wps_state": "1", + "ieee80211n": "0", "wep_key0": '"hello"'}) + if "FAIL" not in hapd.request("WPS_PBC"): + raise Exception("WPS unexpectedly enabled") + +def test_ap_wps_tkip(dev, apdev): + """WPS AP trying to enable TKIP""" + ssid = "test-wps" + hapd = hostapd.add_ap(apdev[0], + {"ssid": ssid, "eap_server": "1", "wps_state": "1", + "ieee80211n": "0", "wpa": '1', + "wpa_key_mgmt": "WPA-PSK", + "wpa_passphrase": "12345678"}) + if "FAIL" not in hapd.request("WPS_PBC"): + raise Exception("WPS unexpectedly enabled") + +def test_ap_wps_conf_dummy_cred(dev, apdev): + """WPS PIN provisioning with configured AP using dummy cred""" + ssid = "test-wps-conf" + hapd = hostapd.add_ap(apdev[0], + {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"}) + hapd.request("WPS_PIN any 12345670") + dev[0].scan_for_bss(apdev[0]['bssid'], freq="2412") + dev[0].dump_monitor() + try: + hapd.set("wps_testing_dummy_cred", "1") + dev[0].request("WPS_PIN " + apdev[0]['bssid'] + " 12345670") + for i in range(1, 3): + ev = dev[0].wait_event(["WPS-CRED-RECEIVED"], timeout=15) + if ev is None: + raise Exception("WPS credential %d not received" % i) + dev[0].wait_connected(timeout=30) + finally: + hapd.set("wps_testing_dummy_cred", "0") + +def test_ap_wps_rf_bands(dev, apdev): + """WPS and wps_rf_bands configuration""" + ssid = "test-wps-conf" + params = {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", + "wps_rf_bands": "ag"} + + hapd = hostapd.add_ap(apdev[0], params) + bssid = hapd.own_addr() + hapd.request("WPS_PBC") + dev[0].scan_for_bss(bssid, freq="2412") + dev[0].dump_monitor() + dev[0].request("WPS_PBC " + bssid) + dev[0].wait_connected(timeout=30) + bss = dev[0].get_bss(bssid) + logger.info("BSS: " + str(bss)) + if "103c000103" not in bss['ie']: + raise Exception("RF Bands attribute with expected values not found") + dev[0].request("DISCONNECT") + dev[0].wait_disconnected() + hapd.set("wps_rf_bands", "ad") + hapd.set("wps_rf_bands", "a") + hapd.set("wps_rf_bands", "g") + hapd.set("wps_rf_bands", "b") + hapd.set("wps_rf_bands", "ga") + hapd.disable() + dev[0].dump_monitor() + dev[0].flush_scan_cache() + +def test_ap_wps_pbc_in_m1(dev, apdev): + """WPS and pbc_in_m1""" + ssid = "test-wps-conf" + params = {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", + "config_methods": "virtual_push_button virtual_display", + "pbc_in_m1": "1"} + + hapd = hostapd.add_ap(apdev[0], params) + bssid = hapd.own_addr() + hapd.request("WPS_PBC") + dev[0].scan_for_bss(bssid, freq="2412") + dev[0].dump_monitor() + dev[0].request("WPS_PBC " + bssid) + dev[0].wait_connected(timeout=30) + dev[0].request("DISCONNECT") + dev[0].wait_disconnected() + hapd.disable() + dev[0].dump_monitor() + dev[0].flush_scan_cache() + +def test_ap_wps_pbc_mac_addr_change(dev, apdev, params): + """WPS M1 with MAC address change""" + ssid = "test-wps-mac-addr-change" + hapd = hostapd.add_ap(apdev[0], + {"ssid": ssid, "eap_server": "1", "wps_state": "1"}) + hapd.request("WPS_PBC") + if "PBC Status: Active" not in hapd.request("WPS_GET_STATUS"): + raise Exception("PBC status not shown correctly") + dev[0].flush_scan_cache() + + test_addr = '02:11:22:33:44:55' + addr = dev[0].get_status_field("address") + if addr == test_addr: + raise Exception("Unexpected initial MAC address") + + try: + subprocess.call(['ip', 'link', 'set', 'dev', dev[0].ifname, 'down']) + subprocess.call(['ip', 'link', 'set', 'dev', dev[0].ifname, 'address', + test_addr]) + subprocess.call(['ip', 'link', 'set', 'dev', dev[0].ifname, 'up']) + addr1 = dev[0].get_status_field("address") + if addr1 != test_addr: + raise Exception("Failed to change MAC address") + + dev[0].scan_for_bss(apdev[0]['bssid'], freq="2412", force_scan=True) + dev[0].request("WPS_PBC " + apdev[0]['bssid']) + dev[0].wait_connected(timeout=30) + status = dev[0].get_status() + if status['wpa_state'] != 'COMPLETED' or \ + status['bssid'] != apdev[0]['bssid']: + raise Exception("Not fully connected") + + out = run_tshark(os.path.join(params['logdir'], "hwsim0.pcapng"), + "wps.message_type == 0x04", + display=["wps.mac_address"]) + res = out.splitlines() + + if len(res) < 1: + raise Exception("No M1 message with MAC address found") + if res[0] != addr1: + raise Exception("Wrong M1 MAC address") + dev[0].request("DISCONNECT") + dev[0].wait_disconnected() + hapd.disable() + dev[0].dump_monitor() + dev[0].flush_scan_cache() + finally: + # Restore MAC address + subprocess.call(['ip', 'link', 'set', 'dev', dev[0].ifname, 'down']) + subprocess.call(['ip', 'link', 'set', 'dev', dev[0].ifname, 'address', + addr]) + subprocess.call(['ip', 'link', 'set', 'dev', dev[0].ifname, 'up']) + +def test_ap_wps_pin_start_failure(dev, apdev): + """WPS_PIN start failure""" + with alloc_fail(dev[0], 1, "wpas_wps_start_dev_pw"): + if "FAIL" not in dev[0].request("WPS_PIN any 12345670"): + raise Exception("WPS_PIN not rejected during OOM") + with alloc_fail(dev[0], 1, "wpas_wps_start_dev_pw"): + if "FAIL" not in dev[0].request("WPS_PIN any"): + raise Exception("WPS_PIN not rejected during OOM") + +def test_ap_wps_ap_pin_failure(dev, apdev): + """WPS_AP_PIN failure""" + id = dev[0].add_network() + dev[0].set_network(id, "mode", "2") + dev[0].set_network_quoted(id, "ssid", "wpas-ap-wps") + dev[0].set_network_quoted(id, "psk", "1234567890") + dev[0].set_network(id, "frequency", "2412") + dev[0].set_network(id, "scan_freq", "2412") + dev[0].select_network(id) + dev[0].wait_connected() + + with fail_test(dev[0], 1, + "os_get_random;wpa_supplicant_ctrl_iface_wps_ap_pin"): + if "FAIL" not in dev[0].request("WPS_AP_PIN random"): + raise Exception("WPS_AP_PIN random accepted") + with alloc_fail(dev[0], 1, "wpas_wps_ap_pin_set"): + if "FAIL" not in dev[0].request("WPS_AP_PIN set 12345670"): + raise Exception("WPS_AP_PIN set accepted") + + dev[0].request("DISCONNECT") + dev[0].wait_disconnected() + +def test_ap_wps_random_uuid(dev, apdev, params): + """WPS and random UUID on Enrollee""" + ssid = "test-wps-conf" + hapd = hostapd.add_ap(apdev[0], + {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"}) + + config = os.path.join(params['logdir'], 'ap_wps_random_uuid.conf') + with open(config, "w") as f: + f.write("auto_uuid=1\n") + + wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5') + + uuid = [] + for i in range(3): + wpas.interface_add("wlan5", config=config) + + wpas.scan_for_bss(apdev[0]['bssid'], freq="2412") + wpas.dump_monitor() + wpas.request("WPS_PBC " + apdev[0]['bssid']) + + ev = hapd.wait_event(["WPS-ENROLLEE-SEEN"], timeout=10) + if ev is None: + raise Exception("Enrollee not seen") + uuid.append(ev.split(' ')[2]) + wpas.request("WPS_CANCEL") + wpas.dump_monitor() + + wpas.interface_remove("wlan5") + + hapd.dump_monitor() + + logger.info("Seen UUIDs: " + str(uuid)) + if uuid[0] == uuid[1] or uuid[0] == uuid[2] or uuid[1] == uuid[2]: + raise Exception("Same UUID used multiple times") + +def test_ap_wps_conf_pin_gcmp_128(dev, apdev): + """WPS PIN provisioning with configured AP using GCMP-128""" + run_ap_wps_conf_pin_cipher(dev, apdev, "GCMP") + +def test_ap_wps_conf_pin_gcmp_256(dev, apdev): + """WPS PIN provisioning with configured AP using GCMP-256""" + run_ap_wps_conf_pin_cipher(dev, apdev, "GCMP-256") + +def test_ap_wps_conf_pin_ccmp_256(dev, apdev): + """WPS PIN provisioning with configured AP using CCMP-256""" + run_ap_wps_conf_pin_cipher(dev, apdev, "CCMP-256") + +def run_ap_wps_conf_pin_cipher(dev, apdev, cipher): + if cipher not in dev[0].get_capability("pairwise"): + raise HwsimSkip("Cipher %s not supported" % cipher) + ssid = "test-wps-conf-pin" + hapd = hostapd.add_ap(apdev[0], + {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "wpa_key_mgmt": "WPA-PSK", + "rsn_pairwise": cipher}) + logger.info("WPS provisioning step") + pin = dev[0].wps_read_pin() + hapd.request("WPS_PIN any " + pin) + dev[0].scan_for_bss(apdev[0]['bssid'], freq="2412") + dev[0].request("WPS_PIN %s %s" % (apdev[0]['bssid'], pin)) + dev[0].wait_connected(timeout=15) + +def test_ap_wps_and_sae(dev, apdev): + """Initial AP configuration with first WPS Enrollee and adding SAE""" + try: + run_ap_wps_and_sae(dev, apdev) + finally: + dev[0].set("wps_cred_add_sae", "0") + +def run_ap_wps_and_sae(dev, apdev): + ssid = "test-wps-sae" + hapd = hostapd.add_ap(apdev[0], + {"ssid": ssid, "eap_server": "1", "wps_state": "1", + "wps_cred_add_sae": "1"}) + logger.info("WPS provisioning step") + pin = dev[0].wps_read_pin() + hapd.request("WPS_PIN any " + pin) + + dev[0].set("wps_cred_add_sae", "1") + dev[0].request("SET sae_groups ") + dev[0].scan_for_bss(apdev[0]['bssid'], freq="2412", force_scan=True) + dev[0].request("WPS_PIN " + apdev[0]['bssid'] + " " + pin) + dev[0].wait_connected(timeout=30) + status = dev[0].get_status() + if status['key_mgmt'] != "SAE": + raise Exception("SAE not used") + if 'pmf' not in status or status['pmf'] != "1": + raise Exception("PMF not enabled") + + pin = dev[1].wps_read_pin() + hapd.request("WPS_PIN any " + pin) + dev[1].scan_for_bss(apdev[0]['bssid'], freq="2412", force_scan=True) + dev[1].request("WPS_PIN " + apdev[0]['bssid'] + " " + pin) + dev[1].wait_connected(timeout=30) + status = dev[1].get_status() + if status['key_mgmt'] != "WPA2-PSK": + raise Exception("WPA2-PSK not used") + if 'pmf' in status: + raise Exception("PMF enabled") + +def test_ap_wps_conf_and_sae(dev, apdev): + """WPS PBC provisioning with configured AP using PSK+SAE""" + try: + run_ap_wps_conf_and_sae(dev, apdev) + finally: + dev[0].set("wps_cred_add_sae", "0") + +def run_ap_wps_conf_and_sae(dev, apdev): + ssid = "test-wps-conf-sae" + hapd = hostapd.add_ap(apdev[0], + {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "wpa_passphrase": "12345678", "wpa": "2", + "ieee80211w": "1", "sae_require_mfp": "1", + "wpa_key_mgmt": "WPA-PSK SAE", + "rsn_pairwise": "CCMP"}) + + dev[0].set("wps_cred_add_sae", "1") + dev[0].request("SET sae_groups ") + dev[0].scan_for_bss(apdev[0]['bssid'], freq="2412") + pin = dev[0].wps_read_pin() + hapd.request("WPS_PIN any " + pin) + dev[0].request("WPS_PIN " + apdev[0]['bssid'] + " " + pin) + dev[0].wait_connected(timeout=30) + status = dev[0].get_status() + if status['key_mgmt'] != "SAE": + raise Exception("SAE not used") + if 'pmf' not in status or status['pmf'] != "1": + raise Exception("PMF not enabled") + + dev[1].connect(ssid, psk="12345678", scan_freq="2412", proto="WPA2", + key_mgmt="WPA-PSK", ieee80211w="0") + +def test_ap_wps_reg_config_and_sae(dev, apdev): + """WPS registrar configuring an AP using AP PIN and using PSK+SAE""" + try: + run_ap_wps_reg_config_and_sae(dev, apdev) + finally: + dev[0].set("wps_cred_add_sae", "0") + +def run_ap_wps_reg_config_and_sae(dev, apdev): + ssid = "test-wps-init-ap-pin-sae" + appin = "12345670" + hostapd.add_ap(apdev[0], + {"ssid": ssid, "eap_server": "1", "wps_state": "2", + "ap_pin": appin, "wps_cred_add_sae": "1"}) + logger.info("WPS configuration step") + dev[0].set("wps_cred_add_sae", "1") + dev[0].request("SET sae_groups ") + dev[0].scan_for_bss(apdev[0]['bssid'], freq=2412) + dev[0].dump_monitor() + new_ssid = "wps-new-ssid" + new_passphrase = "1234567890" + dev[0].wps_reg(apdev[0]['bssid'], appin, new_ssid, "WPA2PSK", "CCMP", + new_passphrase) + status = dev[0].get_status() + if status['key_mgmt'] != "SAE": + raise Exception("SAE not used") + if 'pmf' not in status or status['pmf'] != "1": + raise Exception("PMF not enabled") + + dev[1].connect(new_ssid, psk=new_passphrase, scan_freq="2412", proto="WPA2", + key_mgmt="WPA-PSK", ieee80211w="0")