]> git.ipfire.org Git - thirdparty/hostap.git/blobdiff - tests/hwsim/test_dpp.py
tests: DPP TCP failure cases
[thirdparty/hostap.git] / tests / hwsim / test_dpp.py
index 770b80e093bcc95bd3f54d854ef7e5a2ceb8f9b2..a7f75dca0611caa5194b60867ba646a85d7db7ed 100644 (file)
@@ -55,6 +55,8 @@ def test_dpp_qr_code_parsing(dev, apdev):
 
     tests = ["DPP:C:81/1,115/36;K:MDkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDIgADM2206avxHJaHXgLMkq/24e0rsrfMP9K1Tm8gx+ovP0I=;;",
              "DPP:C:81/1,81/2,81/3,81/4,81/5,81/6,81/7,81/8,81/9,81/10,81/11,81/12,81/13,82/14,83/1,83/2,83/3,83/4,83/5,83/6,83/7,83/8,83/9,84/5,84/6,84/7,84/8,84/9,84/10,84/11,84/12,84/13,115/36;K:MDkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDIgADM2206avxHJaHXgLMkq/24e0rsrfMP9K1Tm8gx+ovP0I=;;",
+             "DPP:C:81/1,2,3,4,5,6,7,8,9,10,11,12,13,82/14,83/1,2,3,4,5,6,7,8,9,84/5,6,7,8,9,10,11,12,13,115/36;K:MDkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDIgADM2206avxHJaHXgLMkq/24e0rsrfMP9K1Tm8gx+ovP0I=;;",
+             "DPP:C:81/1,2,3;K:MDkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDIgADM2206avxHJaHXgLMkq/24e0rsrfMP9K1Tm8gx+ovP0I=;;",
              "DPP:I:SN=4774LH2b4044;M:010203040506;K:MDkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDIgADURzxmttZoIRIPWGoQMV00XHWCAQIhXruVWOz0NjlkIA=;;",
              "DPP:I:;M:010203040506;K:MDkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDIgADURzxmttZoIRIPWGoQMV00XHWCAQIhXruVWOz0NjlkIA=;;"]
     for uri in tests:
@@ -1468,6 +1470,7 @@ def test_dpp_and_sae_akm(dev, apdev):
     if val != "DPP":
         raise Exception("Unexpected key_mgmt for DPP: " + val)
 
+    dev[1].request("SET sae_groups ")
     id = dev[1].connect("dpp+sae", key_mgmt="SAE", scan_freq="2412",
                         ieee80211w="2", psk="sae-password")
     val = dev[1].get_status_field("key_mgmt")
@@ -1779,6 +1782,7 @@ def run_dpp_auto_connect_legacy(dev, apdev, conf='sta-psk',
 
     hapd = hostapd.add_ap(apdev[0], params)
 
+    dev[0].request("SET sae_groups ")
     dev[0].set("dpp_config_processing", "2")
     id0 = dev[0].dpp_bootstrap_gen(chan="81/1", mac=True)
     uri0 = dev[0].request("DPP_BOOTSTRAP_GET_URI %d" % id0)
@@ -3318,6 +3322,11 @@ def test_dpp_qr_code_chan_list_unicast(dev, apdev):
     run_dpp_qr_code_chan_list(dev, apdev, True, 2417,
                               "81/1,81/2,81/3,81/4,81/5,81/6,81/7,81/8,81/9,81/10,81/11,81/12,81/13")
 
+def test_dpp_qr_code_chan_list_unicast2(dev, apdev):
+    """DPP QR Code and 2.4 GHz channels (unicast 2)"""
+    run_dpp_qr_code_chan_list(dev, apdev, True, 2417,
+                              "81/1,2,3,4,5,6,7,8,9,10,11,12,13")
+
 def test_dpp_qr_code_chan_list_no_peer_unicast(dev, apdev):
     """DPP QR Code and channel list and no peer (unicast)"""
     run_dpp_qr_code_chan_list(dev, apdev, True, 2417, "81/1,81/6,81/11",
@@ -4340,6 +4349,7 @@ def run_dpp_legacy_and_dpp_akm(dev, apdev):
     except:
         raise HwsimSkip("DPP not supported")
 
+    dev[0].request("SET sae_groups ")
     conf_id = dev[1].dpp_configurator_add(key=csign)
     dev[0].set("dpp_config_processing", "1")
     id0 = dev[0].dpp_bootstrap_gen(chan="81/1", mac=True)
@@ -4385,3 +4395,184 @@ def run_dpp_legacy_and_dpp_akm(dev, apdev):
 
     dev[0].request("DISCONNECT")
     dev[0].wait_disconnected()
+
+def test_dpp_controller_relay(dev, apdev, params):
+    """DPP Controller/Relay"""
+    try:
+        run_dpp_controller_relay(dev, apdev, params)
+    finally:
+        dev[0].set("dpp_config_processing", "0")
+        dev[1].request("DPP_CONTROLLER_STOP")
+
+def run_dpp_controller_relay(dev, apdev, params):
+    check_dpp_capab(dev[0])
+    check_dpp_capab(dev[1])
+    prefix = "dpp_controller_relay"
+    cap_lo = os.path.join(params['logdir'], prefix + ".lo.pcap")
+
+    cmd = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'lo',
+                            '-w', cap_lo, '-s', '2000'],
+                           stderr=open('/dev/null', 'w'))
+
+    # Controller
+    conf_id = dev[1].dpp_configurator_add()
+    dev[1].set("dpp_configurator_params",
+               " conf=sta-dpp configurator=%d" % conf_id)
+    id_c = dev[1].dpp_bootstrap_gen()
+    uri_c = dev[1].request("DPP_BOOTSTRAP_GET_URI %d" % id_c)
+    res = dev[1].request("DPP_BOOTSTRAP_INFO %d" % id_c)
+    pkhash = None
+    for line in res.splitlines():
+        name, value = line.split('=')
+        if name == "pkhash":
+            pkhash = value
+            break
+    if not pkhash:
+        raise Exception("Could not fetch public key hash from Controller")
+    if "OK" not in dev[1].request("DPP_CONTROLLER_START"):
+        raise Exception("Failed to start Controller")
+
+    # Relay
+    params = {"ssid": "unconfigured",
+              "channel": "6",
+              "dpp_controller": "ipaddr=127.0.0.1 pkhash=" + pkhash}
+    relay = hostapd.add_ap(apdev[1], params)
+    check_dpp_capab(relay)
+
+    # Enroll Relay to the network
+    # TODO: Do this over TCP once direct Enrollee-over-TCP case is supported
+    id_h = relay.dpp_bootstrap_gen(chan="81/6", mac=True)
+    uri_r = relay.request("DPP_BOOTSTRAP_GET_URI %d" % id_h)
+    dev[1].dpp_auth_init(uri=uri_r, conf="ap-dpp", configurator=conf_id)
+    wait_auth_success(relay, dev[1], configurator=dev[1], enrollee=relay)
+    update_hapd_config(relay)
+
+    # Initiate from Enrollee with broadcast DPP Authentication Request
+    dev[0].set("dpp_config_processing", "2")
+    dev[0].dpp_auth_init(uri=uri_c, role="enrollee")
+    wait_auth_success(dev[1], dev[0], configurator=dev[1], enrollee=dev[0],
+                      allow_enrollee_failure=True,
+                      allow_configurator_failure=True)
+    dev[0].wait_connected()
+
+    time.sleep(0.5)
+    cmd.terminate()
+
+def test_dpp_tcp(dev, apdev, params):
+    """DPP over TCP"""
+    prefix = "dpp_tcp"
+    cap_lo = os.path.join(params['logdir'], prefix + ".lo.pcap")
+    try:
+        run_dpp_tcp(dev, apdev, cap_lo)
+    finally:
+        dev[1].request("DPP_CONTROLLER_STOP")
+
+def test_dpp_tcp_port(dev, apdev, params):
+    """DPP over TCP and specified port"""
+    prefix = "dpp_tcp_port"
+    cap_lo = os.path.join(params['logdir'], prefix + ".lo.pcap")
+    try:
+        run_dpp_tcp(dev, apdev, cap_lo, port="23456")
+    finally:
+        dev[1].request("DPP_CONTROLLER_STOP")
+
+def run_dpp_tcp(dev, apdev, cap_lo, port=None):
+    check_dpp_capab(dev[0])
+    check_dpp_capab(dev[1])
+
+    cmd = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'lo',
+                            '-w', cap_lo, '-s', '2000'],
+                           stderr=open('/dev/null', 'w'))
+    time.sleep(1)
+
+    # Controller
+    conf_id = dev[1].dpp_configurator_add()
+    dev[1].set("dpp_configurator_params",
+               " conf=sta-dpp configurator=%d" % conf_id)
+    id_c = dev[1].dpp_bootstrap_gen()
+    uri_c = dev[1].request("DPP_BOOTSTRAP_GET_URI %d" % id_c)
+    res = dev[1].request("DPP_BOOTSTRAP_INFO %d" % id_c)
+    pkhash = None
+    for line in res.splitlines():
+        name, value = line.split('=')
+        if name == "pkhash":
+            pkhash = value
+            break
+    if not pkhash:
+        raise Exception("Could not fetch public key hash from Controller")
+    req = "DPP_CONTROLLER_START"
+    if port:
+        req += " tcp_port=" + port
+    if "OK" not in dev[1].request(req):
+        raise Exception("Failed to start Controller")
+
+    # Initiate from Enrollee with broadcast DPP Authentication Request
+    dev[0].dpp_auth_init(uri=uri_c, role="enrollee", tcp_addr="127.0.0.1",
+                         tcp_port=port)
+    wait_auth_success(dev[1], dev[0], configurator=dev[1], enrollee=dev[0],
+                      allow_enrollee_failure=True,
+                      allow_configurator_failure=True)
+    time.sleep(0.5)
+    cmd.terminate()
+
+def test_dpp_tcp_controller_start_failure(dev, apdev, params):
+    """DPP Controller startup failure"""
+    check_dpp_capab(dev[0])
+
+    try:
+        if "OK" not in dev[0].request("DPP_CONTROLLER_START"):
+            raise Exception("Could not start Controller")
+        if "OK" in dev[0].request("DPP_CONTROLLER_START"):
+            raise Exception("Second Controller start not rejected")
+    finally:
+        dev[0].request("DPP_CONTROLLER_STOP")
+
+    tests = ["dpp_controller_start",
+             "eloop_sock_table_add_sock;?eloop_register_sock;dpp_controller_start"]
+    for func in tests:
+        with alloc_fail(dev[0], 1, func):
+            if "FAIL" not in dev[0].request("DPP_CONTROLLER_START"):
+                raise Exception("Failure not reported during OOM")
+
+def test_dpp_tcp_init_failure(dev, apdev, params):
+    """DPP TCP init failure"""
+    check_dpp_capab(dev[0])
+    check_dpp_capab(dev[1])
+    id_c = dev[1].dpp_bootstrap_gen()
+    uri_c = dev[1].request("DPP_BOOTSTRAP_GET_URI %d" % id_c)
+    peer = dev[0].dpp_qr_code(uri_c)
+    tests = ["dpp_tcp_init",
+             "eloop_sock_table_add_sock;?eloop_register_sock;dpp_tcp_init",
+             "dpp_tcp_encaps"]
+    cmd = "DPP_AUTH_INIT peer=%d tcp_addr=127.0.0.1" % peer
+    for func in tests:
+        with alloc_fail(dev[0], 1, func):
+            if "FAIL" not in dev[0].request(cmd):
+                raise Exception("DPP_AUTH_INIT accepted during OOM")
+
+def test_dpp_controller_rx_failure(dev, apdev, params):
+    """DPP Controller RX failure"""
+    check_dpp_capab(dev[0])
+    check_dpp_capab(dev[1])
+    try:
+        run_dpp_controller_rx_failure(dev, apdev)
+    finally:
+        dev[0].request("DPP_CONTROLLER_STOP")
+
+def run_dpp_controller_rx_failure(dev, apdev):
+    if "OK" not in dev[0].request("DPP_CONTROLLER_START"):
+        raise Exception("Could not start Controller")
+    id_c = dev[0].dpp_bootstrap_gen()
+    uri_c = dev[0].request("DPP_BOOTSTRAP_GET_URI %d" % id_c)
+    peer = dev[1].dpp_qr_code(uri_c)
+    tests = ["dpp_controller_tcp_cb",
+             "eloop_sock_table_add_sock;?eloop_register_sock;dpp_controller_tcp_cb",
+             "dpp_controller_rx",
+             "dpp_controller_rx_auth_req",
+             "wpabuf_alloc;=dpp_controller_rx_auth_req"]
+    cmd = "DPP_AUTH_INIT peer=%d tcp_addr=127.0.0.1" % peer
+    for func in tests:
+        with alloc_fail(dev[0], 1, func):
+            if "OK" not in dev[1].request(cmd):
+                raise Exception("Failed to initiate TCP connection")
+            wait_fail_trigger(dev[0], "GET_ALLOC_FAIL")