]> git.ipfire.org Git - thirdparty/hostap.git/commitdiff
tests: DPP Relay and incomplete connections
authorJouni Malinen <jouni@codeaurora.org>
Mon, 12 Apr 2021 21:31:36 +0000 (00:31 +0300)
committerJouni Malinen <j@w1.fi>
Mon, 12 Apr 2021 21:31:36 +0000 (00:31 +0300)
Signed-off-by: Jouni Malinen <jouni@codeaurora.org>
tests/hwsim/test_dpp.py

index b696c5d1dc2ed889bb852c01f6cd40c3635d92e3..71df7fc64148126daca69329c196e0e5650e5884 100644 (file)
@@ -15,6 +15,10 @@ import socket
 import struct
 import subprocess
 import time
+try:
+    from socketserver import StreamRequestHandler, TCPServer
+except ImportError:
+    from SocketServer import StreamRequestHandler, TCPServer
 
 import hostapd
 import hwsim_utils
@@ -5284,6 +5288,61 @@ def run_dpp_controller_relay(dev, apdev, params, chirp=False):
     time.sleep(0.5)
     wt.close()
 
+class MyTCPServer(TCPServer):
+    def __init__(self, addr, handler):
+        self.allow_reuse_address = True
+        TCPServer.__init__(self, addr, handler)
+
+class DPPControllerServer(StreamRequestHandler):
+        def handle(self):
+            data = self.rfile.read()
+            # Do not reply
+
+def test_dpp_relay_incomplete_connections(dev, apdev):
+    """DPP Relay and incomplete connections"""
+    check_dpp_capab(dev[0], min_ver=2)
+    check_dpp_capab(dev[1], min_ver=2)
+
+    id_c = dev[1].dpp_bootstrap_gen()
+    uri_c = dev[1].request("DPP_BOOTSTRAP_GET_URI %d" % id_c)
+    res = dev[1].request("DPP_BOOTSTRAP_INFO %d" % id_c)
+    pkhash = None
+    for line in res.splitlines():
+        name, value = line.split('=')
+        if name == "pkhash":
+            pkhash = value
+            break
+    if not pkhash:
+        raise Exception("Could not fetch public key hash from Controller")
+
+    params = {"ssid": "unconfigured",
+              "channel": "6",
+              "dpp_controller": "ipaddr=127.0.0.1 pkhash=" + pkhash}
+    hapd = hostapd.add_ap(apdev[0], params)
+    check_dpp_capab(hapd)
+
+    server = MyTCPServer(("127.0.0.1", 8908), DPPControllerServer)
+    server.timeout = 30
+
+    hapd.set("ext_mgmt_frame_handling", "1")
+    dev[0].dpp_auth_init(uri=uri_c, role="enrollee")
+    msg = hapd.mgmt_rx()
+    if msg is None:
+        raise Exception("MGMT RX wait timed out")
+    dev[0].request("DPP_STOP_LISTEN")
+    frame = msg['frame']
+    for i in range(20):
+        if i == 14:
+            time.sleep(20)
+        addr = struct.pack('6B', 0x02, 0, 0, 0, 0, i)
+        tmp = frame[0:10] + addr + frame[16:]
+        hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=" + binascii.hexlify(tmp).decode())
+        ev = hapd.wait_event(["DPP-FAIL"], timeout=0.1)
+        if ev:
+            raise Exception("DPP relay failed [%d]: %s" % (i + 1, ev))
+
+    server.server_close()
+
 def test_dpp_tcp(dev, apdev, params):
     """DPP over TCP"""
     prefix = "dpp_tcp"