]> git.ipfire.org Git - thirdparty/hostap.git/commitdiff
tests: SAE anti clogging during an attack
authorJouni Malinen <jouni@codeaurora.org>
Sun, 3 Mar 2019 15:10:40 +0000 (17:10 +0200)
committerJouni Malinen <j@w1.fi>
Wed, 6 Mar 2019 11:07:25 +0000 (13:07 +0200)
Signed-off-by: Jouni Malinen <jouni@codeaurora.org>
tests/hwsim/test_sae.py

index 45b1f4bced110fae863fdbfc8b23aac0d708e2f8..48997594342a787f103187b158f990d3c2b4999a 100644 (file)
@@ -10,6 +10,9 @@ import os
 import time
 import logging
 logger = logging.getLogger()
+import socket
+import struct
+import subprocess
 
 import hwsim_utils
 import hostapd
@@ -1372,3 +1375,149 @@ def test_sae_reauth(dev, apdev):
     dev[0].request("PMKSA_FLUSH")
     dev[0].request("REASSOCIATE")
     dev[0].wait_connected(timeout=10, error="Timeout on re-connection")
+
+def test_sae_anti_clogging_during_attack(dev, apdev):
+    """SAE anti clogging during an attack"""
+    try:
+        run_sae_anti_clogging_during_attack(dev, apdev)
+    finally:
+        subprocess.call(["ip", "link", "set", "dev", apdev[1]["ifname"],
+                         "down"])
+        subprocess.call(["iw", apdev[1]["ifname"], "set", "type", "managed"])
+
+def build_sae_commit(bssid, addr, group=21, token=None):
+    if group == 19:
+        scalar = binascii.unhexlify("7332d3ebff24804005ccd8c56141e3ed8d84f40638aa31cd2fac11d4d2e89e7b")
+        element = binascii.unhexlify("954d0f4457066bff3168376a1d7174f4e66620d1792406f613055b98513a7f03a538c13dfbaf2029e2adc6aa96aa0ddcf08ac44887b02f004b7f29b9dbf4b7d9")
+    elif group == 21:
+        scalar = binascii.unhexlify("001eec673111b902f5c8a61c8cb4c1c4793031aeea8c8c319410903bc64bcbaea134ab01c4e016d51436f5b5426f7e2af635759a3033fb4031ea79f89a62a3e2f828")
+        element = binascii.unhexlify("00580eb4b448ea600ea277d5e66e4ed37db82bb04ac90442e9c3727489f366ba4b82f0a472d02caf4cdd142e96baea5915d71374660ee23acbaca38cf3fe8c5fb94b01abbc5278121635d7c06911c5dad8f18d516e1fbe296c179b7c87a1dddfab393337d3d215ed333dd396da6d8f20f798c60d054f1093c24d9c2d98e15c030cc375f0")
+        pass
+    frame = binascii.unhexlify("b0003a01")
+    frame += bssid + addr + bssid
+    frame += binascii.unhexlify("1000")
+    auth_alg = 3
+    transact = 1
+    status = 0
+    frame += struct.pack("<HHHH", auth_alg, transact, status, group)
+    if token:
+        frame += token
+    frame += scalar + element
+    return frame
+
+def sae_rx_commit_token_req(sock, radiotap, send_two=False):
+    msg = sock.recv(1500)
+    ver,pad,len,present = struct.unpack('<BBHL', msg[0:8])
+    frame = msg[len:]
+    fc,duration = struct.unpack('<HH', frame[0:4])
+    if fc != 0xb0:
+        return False
+    frame = frame[4:]
+    da = frame[0:6]
+    if da[0] != 0xf2:
+        return False
+    sa = frame[6:12]
+    bssid = frame[12:18]
+    body = frame[20:]
+
+    alg,seq,status,group = struct.unpack('<HHHH', body[0:8])
+    if alg != 3 or seq != 1 or status != 76:
+        return False
+    token = body[8:]
+
+    frame = build_sae_commit(bssid, da, token=token)
+    sock.send(radiotap + frame)
+    if send_two:
+        sock.send(radiotap + frame)
+    return True
+
+def run_sae_anti_clogging_during_attack(dev, apdev):
+    if "SAE" not in dev[0].get_capability("auth_alg"):
+        raise HwsimSkip("SAE not supported")
+    params = hostapd.wpa2_params(ssid="test-sae", passphrase="12345678")
+    params['wpa_key_mgmt'] = 'SAE'
+    params['sae_groups'] = '21'
+    hapd = hostapd.add_ap(apdev[0], params)
+
+    dev[0].scan_for_bss(hapd.own_addr(), freq=2412)
+    dev[0].request("SET sae_groups 21")
+    dev[1].scan_for_bss(hapd.own_addr(), freq=2412)
+    dev[1].request("SET sae_groups 21")
+
+    subprocess.check_call(["iw", apdev[1]["ifname"], "set", "type", "monitor"])
+    subprocess.call(["ip", "link", "set", "dev", apdev[1]["ifname"], "up"])
+    subprocess.check_call(["iw", apdev[1]["ifname"], "set", "freq", "2412"])
+
+    ETH_P_ALL = 3
+    sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW,
+                         socket.htons(ETH_P_ALL))
+    sock.bind((apdev[1]["ifname"], 0))
+    sock.settimeout(0.5)
+    radiotap_payload = struct.pack('BB', 0x08, 0)
+    radiotap_payload += struct.pack('BB', 0, 0)
+    radiotap_payload += struct.pack('BB', 0, 0)
+    radiotap_hdr = struct.pack('<BBHL', 0, 0, 8 + len(radiotap_payload),
+                               0xc002)
+    radiotap = radiotap_hdr + radiotap_payload
+
+    bssid = binascii.unhexlify(hapd.own_addr().replace(':', ''))
+    for i in range(16):
+        addr = binascii.unhexlify("f2%010x" % i)
+        frame = build_sae_commit(bssid, addr)
+        sock.send(radiotap + frame)
+        sock.send(radiotap + frame)
+
+    count = 0
+    for i in range(150):
+        if sae_rx_commit_token_req(sock, radiotap, send_two=True):
+            count += 1
+    logger.info("Number of token responses sent: %d" % count)
+    if count < 10:
+        raise Exception("Too few token responses seen: %d" % count)
+
+    for i in range(16):
+        addr = binascii.unhexlify("f201%08x" % i)
+        frame = build_sae_commit(bssid, addr)
+        sock.send(radiotap + frame)
+
+    count = 0
+    for i in range(150):
+        if sae_rx_commit_token_req(sock, radiotap):
+            count += 1
+            if count == 10:
+                break
+    if count < 10:
+        raise Exception("Too few token responses in second round: %d" % count)
+
+    dev[0].connect("test-sae", psk="12345678", key_mgmt="SAE",
+                   scan_freq="2412", wait_connect=False)
+    dev[1].connect("test-sae", psk="12345678", key_mgmt="SAE",
+                   scan_freq="2412", wait_connect=False)
+
+    count = 0
+    connected0 = False
+    connected1 = False
+    for i in range(1000):
+        if sae_rx_commit_token_req(sock, radiotap):
+            count += 1
+            addr = binascii.unhexlify("f202%08x" % i)
+            frame = build_sae_commit(bssid, addr)
+            sock.send(radiotap + frame)
+        while dev[0].mon.pending():
+            ev = dev[0].mon.recv()
+            logger.debug("EV0: " + ev)
+            if "CTRL-EVENT-CONNECTED" in ev:
+                connected0 = True
+        while dev[1].mon.pending():
+            ev = dev[1].mon.recv()
+            logger.debug("EV1: " + ev)
+            if "CTRL-EVENT-CONNECTED" in ev:
+                connected1 = True
+        if connected0 and connected1:
+            break
+    if not connected0:
+        raise Exception("Real station(0) did not get connected")
+    if not connected1:
+        raise Exception("Real station(1) did not get connected")
+    if count < 1:
+        raise Exception("Too few token responses in third round: %d" % count)