logger = logging.getLogger()
import os
import re
+import socket
import struct
import subprocess
import time
import hostapd
-from utils import HwsimSkip, fail_test, skip_with_fips
+from utils import HwsimSkip, fail_test, skip_with_fips, start_monitor, stop_monitor, radiotap_build
import hwsim_utils
from wpasupplicant import WpaSupplicant
if ev is None:
raise Exception("Timeout on AP-STA-CONNECTED from hostapd")
-def eapol_test(apdev, dev, wpa2=True):
+def eapol_test(apdev, dev, wpa2=True, ieee80211w=0):
bssid = apdev['bssid']
if wpa2:
ssid = "test-wpa2-psk"
else:
params = hostapd.wpa_params(ssid=ssid)
params['wpa_psk'] = psk
+ params['ieee80211w'] = str(ieee80211w)
hapd = hostapd.add_ap(apdev, params)
hapd.request("SET ext_eapol_frame_io 1")
dev.request("SET ext_eapol_frame_io 1")
- dev.connect(ssid, raw_psk=psk, scan_freq="2412", wait_connect=False)
+ dev.connect(ssid, raw_psk=psk, scan_freq="2412", wait_connect=False,
+ ieee80211w=str(ieee80211w))
addr = dev.p2p_interface_addr()
if wpa2:
- rsne = binascii.unhexlify('30140100000fac040100000fac040100000fac020000')
+ if ieee80211w == 2:
+ rsne = binascii.unhexlify('30140100000fac040100000fac040100000fac02cc00')
+ else:
+ rsne = binascii.unhexlify('30140100000fac040100000fac040100000fac020000')
else:
rsne = binascii.unhexlify('dd160050f20101000050f20201000050f20201000050f202')
snonce = binascii.unhexlify('1111111111111111111111111111111111111111111111111111111111111111')
raise Exception("Unencrypted GTK KDE not reported")
dev[0].wait_disconnected(timeout=1)
+def run_psk_supp_proto_pmf2(dev, apdev, igtk_kde=None, fail=False):
+ (bssid, ssid, hapd, snonce, pmk, addr, rsne) = eapol_test(apdev[0], dev[0],
+ ieee80211w=2)
+
+ # Wait for EAPOL-Key msg 1/4 from hostapd to determine when associated
+ msg = recv_eapol(hapd)
+ dev[0].dump_monitor()
+
+ # Build own EAPOL-Key msg 1/4
+ anonce = binascii.unhexlify('2222222222222222222222222222222222222222222222222222222222222222')
+ counter = 1
+ msg = build_eapol_key_1_4(anonce, replay_counter=counter)
+ counter += 1
+ send_eapol(dev[0], bssid, build_eapol(msg))
+ msg = recv_eapol(dev[0])
+ snonce = msg['rsn_key_nonce']
+
+ (ptk, kck, kek) = pmk_to_ptk(pmk, addr, bssid, snonce, anonce)
+
+ logger.debug("EAPOL-Key msg 3/4")
+ dev[0].dump_monitor()
+ gtk_kde = binascii.unhexlify('dd16000fac010100dc11188831bf4aa4a8678d2b41498618')
+ plain = rsne + gtk_kde
+ if igtk_kde:
+ plain += igtk_kde
+ wrapped = aes_wrap(kek, pad_key_data(plain))
+ msg = build_eapol_key_3_4(anonce, kck, wrapped, replay_counter=counter)
+ counter += 1
+ send_eapol(dev[0], bssid, build_eapol(msg))
+ if fail:
+ dev[0].wait_disconnected(timeout=1)
+ return
+
+ dev[0].wait_connected(timeout=1)
+
+ # Verify that an unprotected broadcast Deauthentication frame is ignored
+ bssid = binascii.unhexlify(hapd.own_addr().replace(':', ''))
+ sock = start_monitor(apdev[1]["ifname"])
+ radiotap = radiotap_build()
+ frame = binascii.unhexlify("c0003a01")
+ frame += 6*b'\xff' + bssid + bssid
+ frame += binascii.unhexlify("1000" + "0300")
+ sock.send(radiotap + frame)
+ # And same with incorrect BIP protection
+ for keyid in ["0400", "0500", "0600", "0004", "0005", "0006", "ffff"]:
+ frame2 = frame + binascii.unhexlify("4c10" + keyid + "010000000000c0e5ca5f2b3b4de9")
+ sock.send(radiotap + frame2)
+ ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=0.5)
+ if ev is not None:
+ raise Exception("Unexpected disconnection")
+
+def run_psk_supp_proto_pmf(dev, apdev, igtk_kde=None, fail=False):
+ try:
+ run_psk_supp_proto_pmf2(dev, apdev, igtk_kde=igtk_kde, fail=fail)
+ finally:
+ stop_monitor(apdev[1]["ifname"])
+
+def test_ap_wpa2_psk_supp_proto_no_igtk(dev, apdev):
+ """WPA2-PSK supplicant protocol testing: no IGTK KDE"""
+ run_psk_supp_proto_pmf(dev, apdev, igtk_kde=None)
+
+def test_ap_wpa2_psk_supp_proto_igtk_ok(dev, apdev):
+ """WPA2-PSK supplicant protocol testing: valid IGTK KDE"""
+ igtk_kde = binascii.unhexlify('dd1c' + '000fac09' + '0400' + 6*'00' + 16*'77')
+ run_psk_supp_proto_pmf(dev, apdev, igtk_kde=igtk_kde)
+
+def test_ap_wpa2_psk_supp_proto_igtk_keyid_swap(dev, apdev):
+ """WPA2-PSK supplicant protocol testing: swapped IGTK KeyID"""
+ igtk_kde = binascii.unhexlify('dd1c' + '000fac09' + '0004' + 6*'00' + 16*'77')
+ run_psk_supp_proto_pmf(dev, apdev, igtk_kde=igtk_kde)
+
+def test_ap_wpa2_psk_supp_proto_igtk_keyid_too_large(dev, apdev):
+ """WPA2-PSK supplicant protocol testing: too large IGTK KeyID"""
+ igtk_kde = binascii.unhexlify('dd1c' + '000fac09' + 'ffff' + 6*'00' + 16*'77')
+ run_psk_supp_proto_pmf(dev, apdev, igtk_kde=igtk_kde, fail=True)
+
+def test_ap_wpa2_psk_supp_proto_igtk_keyid_unexpected(dev, apdev):
+ """WPA2-PSK supplicant protocol testing: unexpected IGTK KeyID"""
+ igtk_kde = binascii.unhexlify('dd1c' + '000fac09' + '0006' + 6*'00' + 16*'77')
+ run_psk_supp_proto_pmf(dev, apdev, igtk_kde=igtk_kde, fail=True)
+
def find_wpas_process(dev):
ifname = dev.ifname
err, data = dev.cmd_execute(['ps', 'ax'])
import hwsim_utils
import hostapd
from wpasupplicant import WpaSupplicant
-from utils import HwsimSkip, alloc_fail, fail_test, wait_fail_trigger
+from utils import HwsimSkip, alloc_fail, fail_test, wait_fail_trigger, start_monitor, stop_monitor, radiotap_build
from test_ap_psk import find_wpas_process, read_process_memory, verify_not_present, get_key_locations
@remote_compatible
sock.send(radiotap + frame)
return True
-def radiotap_build():
- radiotap_payload = struct.pack('BB', 0x08, 0)
- radiotap_payload += struct.pack('BB', 0, 0)
- radiotap_payload += struct.pack('BB', 0, 0)
- radiotap_hdr = struct.pack('<BBHL', 0, 0, 8 + len(radiotap_payload),
- 0xc002)
- return radiotap_hdr + radiotap_payload
-
-def start_monitor(ifname, freq=2412):
- subprocess.check_call(["iw", ifname, "set", "type", "monitor"])
- subprocess.call(["ip", "link", "set", "dev", ifname, "up"])
- subprocess.check_call(["iw", ifname, "set", "freq", str(freq)])
-
- ETH_P_ALL = 3
- sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW,
- socket.htons(ETH_P_ALL))
- sock.bind((ifname, 0))
- sock.settimeout(0.5)
- return sock
-
-def stop_monitor(ifname):
- subprocess.call(["ip", "link", "set", "dev", ifname, "down"])
- subprocess.call(["iw", ifname, "set", "type", "managed"])
-
def run_sae_anti_clogging_during_attack(dev, apdev):
if "SAE" not in dev[0].get_capability("auth_alg"):
raise HwsimSkip("SAE not supported")
import binascii
import os
+import socket
import struct
import subprocess
import time
clear_country(dev)
for i in range(count):
dev[i].flush_scan_cache()
+
+def radiotap_build():
+ radiotap_payload = struct.pack('BB', 0x08, 0)
+ radiotap_payload += struct.pack('BB', 0, 0)
+ radiotap_payload += struct.pack('BB', 0, 0)
+ radiotap_hdr = struct.pack('<BBHL', 0, 0, 8 + len(radiotap_payload),
+ 0xc002)
+ return radiotap_hdr + radiotap_payload
+
+def start_monitor(ifname, freq=2412):
+ subprocess.check_call(["iw", ifname, "set", "type", "monitor"])
+ subprocess.call(["ip", "link", "set", "dev", ifname, "up"])
+ subprocess.check_call(["iw", ifname, "set", "freq", str(freq)])
+
+ ETH_P_ALL = 3
+ sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW,
+ socket.htons(ETH_P_ALL))
+ sock.bind((ifname, 0))
+ sock.settimeout(0.5)
+ return sock
+
+def stop_monitor(ifname):
+ subprocess.call(["ip", "link", "set", "dev", ifname, "down"])
+ subprocess.call(["iw", ifname, "set", "type", "managed"])