]> git.ipfire.org Git - thirdparty/hostap.git/blobdiff - tests/hwsim/test_dpp.py
tests: DPP over TCP using Controller/Relay
[thirdparty/hostap.git] / tests / hwsim / test_dpp.py
index 770b80e093bcc95bd3f54d854ef7e5a2ceb8f9b2..da32164a87aa8eb8b21ef47881810fc68d271280 100644 (file)
@@ -4385,3 +4385,122 @@ def run_dpp_legacy_and_dpp_akm(dev, apdev):
 
     dev[0].request("DISCONNECT")
     dev[0].wait_disconnected()
+
+def test_dpp_controller_relay(dev, apdev, params):
+    """DPP Controller/Relay"""
+    try:
+        run_dpp_controller_relay(dev, apdev, params)
+    finally:
+        dev[0].set("dpp_config_processing", "0")
+        dev[1].request("DPP_CONTROLLER_STOP")
+
+def run_dpp_controller_relay(dev, apdev, params):
+    check_dpp_capab(dev[0])
+    check_dpp_capab(dev[1])
+    prefix = "dpp_controller_relay"
+    cap_lo = os.path.join(params['logdir'], prefix + ".lo.pcap")
+
+    cmd = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'lo',
+                            '-w', cap_lo, '-s', '2000'],
+                           stderr=open('/dev/null', 'w'))
+
+    # Controller
+    conf_id = dev[1].dpp_configurator_add()
+    dev[1].set("dpp_configurator_params",
+               " conf=sta-dpp configurator=%d" % conf_id)
+    id_c = dev[1].dpp_bootstrap_gen()
+    uri_c = dev[1].request("DPP_BOOTSTRAP_GET_URI %d" % id_c)
+    res = dev[1].request("DPP_BOOTSTRAP_INFO %d" % id_c)
+    pkhash = None
+    for line in res.splitlines():
+        name, value = line.split('=')
+        if name == "pkhash":
+            pkhash = value
+            break
+    if not pkhash:
+        raise Exception("Could not fetch public key hash from Controller")
+    if "OK" not in dev[1].request("DPP_CONTROLLER_START"):
+        raise Exception("Failed to start Controller")
+
+    # Relay
+    params = {"ssid": "unconfigured",
+              "channel": "6",
+              "dpp_controller": "ipaddr=127.0.0.1 pkhash=" + pkhash}
+    relay = hostapd.add_ap(apdev[1], params)
+    check_dpp_capab(relay)
+
+    # Enroll Relay to the network
+    # TODO: Do this over TCP once direct Enrollee-over-TCP case is supported
+    id_h = relay.dpp_bootstrap_gen(chan="81/6", mac=True)
+    uri_r = relay.request("DPP_BOOTSTRAP_GET_URI %d" % id_h)
+    dev[1].dpp_auth_init(uri=uri_r, conf="ap-dpp", configurator=conf_id)
+    wait_auth_success(relay, dev[1], configurator=dev[1], enrollee=relay)
+    update_hapd_config(relay)
+
+    # Initiate from Enrollee with broadcast DPP Authentication Request
+    dev[0].set("dpp_config_processing", "2")
+    dev[0].dpp_auth_init(uri=uri_c, role="enrollee")
+    wait_auth_success(dev[1], dev[0], configurator=dev[1], enrollee=dev[0],
+                      allow_enrollee_failure=True,
+                      allow_configurator_failure=True)
+    dev[0].wait_connected()
+
+    time.sleep(0.5)
+    cmd.terminate()
+
+def test_dpp_tcp(dev, apdev, params):
+    """DPP over TCP"""
+    prefix = "dpp_tcp"
+    cap_lo = os.path.join(params['logdir'], prefix + ".lo.pcap")
+    try:
+        run_dpp_tcp(dev, apdev, cap_lo)
+    finally:
+        dev[1].request("DPP_CONTROLLER_STOP")
+
+def test_dpp_tcp_port(dev, apdev, params):
+    """DPP over TCP and specified port"""
+    prefix = "dpp_tcp_port"
+    cap_lo = os.path.join(params['logdir'], prefix + ".lo.pcap")
+    try:
+        run_dpp_tcp(dev, apdev, cap_lo, port="23456")
+    finally:
+        dev[1].request("DPP_CONTROLLER_STOP")
+
+def run_dpp_tcp(dev, apdev, cap_lo, port=None):
+    check_dpp_capab(dev[0])
+    check_dpp_capab(dev[1])
+
+    cmd = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'lo',
+                            '-w', cap_lo, '-s', '2000'],
+                           stderr=open('/dev/null', 'w'))
+    time.sleep(1)
+
+    # Controller
+    conf_id = dev[1].dpp_configurator_add()
+    dev[1].set("dpp_configurator_params",
+               " conf=sta-dpp configurator=%d" % conf_id)
+    id_c = dev[1].dpp_bootstrap_gen()
+    uri_c = dev[1].request("DPP_BOOTSTRAP_GET_URI %d" % id_c)
+    res = dev[1].request("DPP_BOOTSTRAP_INFO %d" % id_c)
+    pkhash = None
+    for line in res.splitlines():
+        name, value = line.split('=')
+        if name == "pkhash":
+            pkhash = value
+            break
+    if not pkhash:
+        raise Exception("Could not fetch public key hash from Controller")
+    req = "DPP_CONTROLLER_START"
+    if port:
+        req += " tcp_port=" + port
+    if "OK" not in dev[1].request(req):
+        raise Exception("Failed to start Controller")
+
+    # Initiate from Enrollee with broadcast DPP Authentication Request
+    dev[0].dpp_auth_init(uri=uri_c, role="enrollee", tcp_addr="127.0.0.1",
+                         tcp_port=port)
+    wait_auth_success(dev[1], dev[0], configurator=dev[1], enrollee=dev[0],
+                      allow_enrollee_failure=True,
+                      allow_configurator_failure=True)
+    time.sleep(0.5)
+    cmd.terminate()