]> git.ipfire.org Git - thirdparty/hostap.git/commitdiff
tests: SAE anti-clogging request with mesh BSS
authorJouni Malinen <jouni@codeaurora.org>
Thu, 14 Mar 2019 22:12:05 +0000 (00:12 +0200)
committerJouni Malinen <j@w1.fi>
Thu, 14 Mar 2019 22:31:09 +0000 (00:31 +0200)
Signed-off-by: Jouni Malinen <jouni@codeaurora.org>
tests/hwsim/test_sae.py
tests/hwsim/test_wpas_mesh.py

index 85927d757411c9a30acb96db08d82267add73cbd..ca0add28023f933d4f9f469d11d7393196a87da9 100644 (file)
@@ -1400,9 +1400,7 @@ def test_sae_anti_clogging_during_attack(dev, apdev):
     try:
         run_sae_anti_clogging_during_attack(dev, apdev)
     finally:
-        subprocess.call(["ip", "link", "set", "dev", apdev[1]["ifname"],
-                         "down"])
-        subprocess.call(["iw", apdev[1]["ifname"], "set", "type", "managed"])
+        stop_monitor(apdev[1]["ifname"])
 
 def build_sae_commit(bssid, addr, group=21, token=None):
     if group == 19:
@@ -1450,6 +1448,30 @@ def sae_rx_commit_token_req(sock, radiotap, send_two=False):
         sock.send(radiotap + frame)
     return True
 
+def radiotap_build():
+    radiotap_payload = struct.pack('BB', 0x08, 0)
+    radiotap_payload += struct.pack('BB', 0, 0)
+    radiotap_payload += struct.pack('BB', 0, 0)
+    radiotap_hdr = struct.pack('<BBHL', 0, 0, 8 + len(radiotap_payload),
+                               0xc002)
+    return radiotap_hdr + radiotap_payload
+
+def start_monitor(ifname, freq=2412):
+    subprocess.check_call(["iw", ifname, "set", "type", "monitor"])
+    subprocess.call(["ip", "link", "set", "dev", ifname, "up"])
+    subprocess.check_call(["iw", ifname, "set", "freq", str(freq)])
+
+    ETH_P_ALL = 3
+    sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW,
+                         socket.htons(ETH_P_ALL))
+    sock.bind((ifname, 0))
+    sock.settimeout(0.5)
+    return sock
+
+def stop_monitor(ifname):
+    subprocess.call(["ip", "link", "set", "dev", ifname, "down"])
+    subprocess.call(["iw", ifname, "set", "type", "managed"])
+
 def run_sae_anti_clogging_during_attack(dev, apdev):
     if "SAE" not in dev[0].get_capability("auth_alg"):
         raise HwsimSkip("SAE not supported")
@@ -1463,21 +1485,8 @@ def run_sae_anti_clogging_during_attack(dev, apdev):
     dev[1].scan_for_bss(hapd.own_addr(), freq=2412)
     dev[1].request("SET sae_groups 21")
 
-    subprocess.check_call(["iw", apdev[1]["ifname"], "set", "type", "monitor"])
-    subprocess.call(["ip", "link", "set", "dev", apdev[1]["ifname"], "up"])
-    subprocess.check_call(["iw", apdev[1]["ifname"], "set", "freq", "2412"])
-
-    ETH_P_ALL = 3
-    sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW,
-                         socket.htons(ETH_P_ALL))
-    sock.bind((apdev[1]["ifname"], 0))
-    sock.settimeout(0.5)
-    radiotap_payload = struct.pack('BB', 0x08, 0)
-    radiotap_payload += struct.pack('BB', 0, 0)
-    radiotap_payload += struct.pack('BB', 0, 0)
-    radiotap_hdr = struct.pack('<BBHL', 0, 0, 8 + len(radiotap_payload),
-                               0xc002)
-    radiotap = radiotap_hdr + radiotap_payload
+    sock = start_monitor(apdev[1]["ifname"])
+    radiotap = radiotap_build()
 
     bssid = binascii.unhexlify(hapd.own_addr().replace(':', ''))
     for i in range(16):
index a44d393d47b2b29b9ad7826ba04a01e2253eba5e..46069beb19327174e1c578455054bf7a169211a3 100644 (file)
@@ -19,6 +19,8 @@ from wpasupplicant import WpaSupplicant
 from utils import HwsimSkip, alloc_fail, fail_test, wait_fail_trigger
 from tshark import run_tshark, run_tshark_json
 from test_ap_ht import set_world_reg
+from test_sae import radiotap_build, start_monitor, stop_monitor, \
+    build_sae_commit, sae_rx_commit_token_req
 from hwsim_utils import set_group_map
 
 def check_mesh_support(dev, secure=False):
@@ -2414,3 +2416,60 @@ def test_mesh_forwarding_secure(dev):
         set_group_map(dev[0], 1)
         set_group_map(dev[1], 1)
         set_group_map(dev[2], 1)
+
+def test_mesh_sae_anti_clogging(dev, apdev):
+    """Mesh using SAE and anti-clogging"""
+    try:
+        run_mesh_sae_anti_clogging(dev, apdev)
+    finally:
+        stop_monitor(apdev[1]["ifname"])
+
+def run_mesh_sae_anti_clogging(dev, apdev):
+    check_mesh_support(dev[0], secure=True)
+    check_mesh_support(dev[1], secure=True)
+    check_mesh_support(dev[2], secure=True)
+
+    sock = start_monitor(apdev[1]["ifname"])
+    radiotap = radiotap_build()
+
+    dev[0].request("SET sae_groups 21")
+    id = add_mesh_secure_net(dev[0])
+    dev[0].mesh_group_add(id)
+    check_mesh_group_added(dev[0])
+
+    # This flood of SAE authentication frames is from not yet known mesh STAs,
+    # so the messages get dropped.
+    addr0 = binascii.unhexlify(dev[0].own_addr().replace(':', ''))
+    for i in range(16):
+        addr = binascii.unhexlify("f2%010x" % i)
+        frame = build_sae_commit(addr0, addr)
+        sock.send(radiotap + frame)
+
+    dev[1].request("SET sae_groups 21")
+    id = add_mesh_secure_net(dev[1])
+    dev[1].mesh_group_add(id)
+    check_mesh_group_added(dev[1])
+    check_mesh_connected2(dev)
+
+    # Inject Beacon frames to make the sources of the second flood known to the
+    # target.
+    bcn1 = binascii.unhexlify("80000000" + "ffffffffffff")
+    bcn2 = binascii.unhexlify("0000dd20c44015840500e80310000000010882848b968c1298240301010504000200003204b048606c30140100000fac040100000fac040100000fac0800002d1afe131bffff0000000000000000000001000000000000000000003d16010000000000ffff0000000000000000000000000000720d777061732d6d6573682d736563710701010001010009")
+    for i in range(16):
+        addr = binascii.unhexlify("f4%010x" % i)
+        frame = bcn1 + addr + addr + bcn2
+        sock.send(radiotap + frame)
+
+    # This flood of SAE authentication frames is from known mesh STAs, so the
+    # target will need to process these.
+    for i in range(16):
+        addr = binascii.unhexlify("f4%010x" % i)
+        frame = build_sae_commit(addr0, addr)
+        sock.send(radiotap + frame)
+
+    dev[2].request("SET sae_groups 21")
+    id = add_mesh_secure_net(dev[2])
+    dev[2].mesh_group_add(id)
+    check_mesh_group_added(dev[2])
+    check_mesh_peer_connected(dev[2])
+    check_mesh_peer_connected(dev[0])