]> git.ipfire.org Git - thirdparty/hostap.git/commitdiff
tests: Helper functions for DPP PKEX initiator and responder
authorJouni Malinen <j@w1.fi>
Sun, 17 Mar 2019 17:22:11 +0000 (19:22 +0200)
committerJouni Malinen <j@w1.fi>
Sun, 17 Mar 2019 18:57:58 +0000 (20:57 +0200)
Signed-off-by: Jouni Malinen <j@w1.fi>
tests/hwsim/hostapd.py
tests/hwsim/test_dpp.py
tests/hwsim/wpasupplicant.py

index 21cbfc93ebadc61c68816894bafa929fc273902b..61db918d7f39373d6452d79200aeebfad1ed0407 100644 (file)
@@ -418,6 +418,38 @@ class Hostapd:
         if "OK" not in self.request(cmd):
             raise Exception("Failed to start listen operation")
 
+    def dpp_pkex_init(self, identifier, code, role=None, key=None, curve=None,
+                      extra=None, use_id=None):
+        if use_id is None:
+            id1 = self.dpp_bootstrap_gen(type="pkex", key=key, curve=curve)
+        else:
+            id1 = use_id
+        cmd = "own=%d " % id1
+        if identifier:
+            cmd += "identifier=%s " % identifier
+        cmd += "init=1 "
+        if role:
+            cmd += "role=%s " % role
+        if extra:
+            cmd += extra + " "
+        cmd += "code=%s" % code
+        res = self.request("DPP_PKEX_ADD " + cmd)
+        if "FAIL" in res:
+            raise Exception("Failed to set PKEX data (initiator)")
+        return id1
+
+    def dpp_pkex_resp(self, freq, identifier, code, key=None, curve=None,
+                      listen_role=None):
+        id0 = self.dpp_bootstrap_gen(type="pkex", key=key, curve=curve)
+        cmd = "own=%d " % id0
+        if identifier:
+            cmd += "identifier=%s " % identifier
+        cmd += "code=%s" % code
+        res = self.request("DPP_PKEX_ADD " + cmd)
+        if "FAIL" in res:
+            raise Exception("Failed to set PKEX data (responder)")
+        self.dpp_listen(freq, role=listen_role)
+
 def add_ap(apdev, params, wait_enabled=True, no_enable=False, timeout=30,
            global_ctrl_override=None):
         if isinstance(apdev, dict):
index 19272af76ea260df3e14b3f39761055ee824a887..86d0e09f5cd7e04fb9b8c474d07d12006af50f7d 100644 (file)
@@ -2231,28 +2231,15 @@ def test_dpp_pkex_identifier_mismatch3(dev, apdev):
     run_dpp_pkex(dev, apdev, identifier_i=None, identifier_r="bar",
                  expect_no_resp=True)
 
-def run_dpp_pkex(dev, apdev, curve=None, init_extra="", check_config=False,
+def run_dpp_pkex(dev, apdev, curve=None, init_extra=None, check_config=False,
                  identifier_i="test", identifier_r="test",
                  expect_no_resp=False):
     check_dpp_capab(dev[0], curve and "brainpool" in curve)
     check_dpp_capab(dev[1], curve and "brainpool" in curve)
-
-    id0 = dev[0].dpp_bootstrap_gen(type="pkex", curve=curve)
-    id1 = dev[1].dpp_bootstrap_gen(type="pkex", curve=curve)
-
-    identifier = " identifier=" + identifier_r if identifier_r else ""
-    cmd = "DPP_PKEX_ADD own=%d%s code=secret" % (id0, identifier)
-    res = dev[0].request(cmd)
-    if "FAIL" in res:
-        raise Exception("Failed to set PKEX data (responder)")
-    dev[0].dpp_listen(2437)
-
-    identifier = " identifier=" + identifier_i if identifier_i else ""
-    cmd = "DPP_PKEX_ADD own=%d%s init=1 %s code=secret" % (id1, identifier,
-                                                           init_extra)
-    res = dev[1].request(cmd)
-    if "FAIL" in res:
-        raise Exception("Failed to set PKEX data (initiator)")
+    dev[0].dpp_pkex_resp(2437, identifier=identifier_r, code="secret",
+                         curve=curve)
+    dev[1].dpp_pkex_init(identifier=identifier_i, code="secret", curve=curve,
+                         extra=init_extra)
 
     if expect_no_resp:
         ev = dev[0].wait_event(["DPP-RX"], timeout=10)
@@ -2289,21 +2276,8 @@ def test_dpp_pkex_5ghz(dev, apdev):
 def run_dpp_pkex_5ghz(dev, apdev):
     check_dpp_capab(dev[0])
     check_dpp_capab(dev[1])
-
-    id0 = dev[0].dpp_bootstrap_gen(type="pkex")
-    id1 = dev[1].dpp_bootstrap_gen(type="pkex")
-
-    cmd = "DPP_PKEX_ADD own=%d identifier=test code=secret" % (id0)
-    res = dev[0].request(cmd)
-    if "FAIL" in res:
-        raise Exception("Failed to set PKEX data (responder)")
-    dev[0].dpp_listen(5745)
-
-    cmd = "DPP_PKEX_ADD own=%d identifier=test init=1 code=secret" % (id1)
-    res = dev[1].request(cmd)
-    if "FAIL" in res:
-        raise Exception("Failed to set PKEX data (initiator)")
-
+    dev[0].dpp_pkex_resp(5745, identifier="test", code="secret")
+    dev[1].dpp_pkex_init(identifier="test", code="secret")
     wait_auth_success(dev[0], dev[1], timeout=20)
 
 def test_dpp_pkex_test_vector(dev, apdev):
@@ -2337,90 +2311,48 @@ def test_dpp_pkex_test_vector(dev, apdev):
     dev[1].set("dpp_pkex_own_mac_override", init_addr)
     dev[1].set("dpp_pkex_peer_mac_override", resp_addr)
 
-    # Responder bootstrapping key
-    id0 = dev[0].dpp_bootstrap_gen(type="pkex",
-                                   key=p256_prefix + resp_priv + p256_postfix)
-
     # Responder y/Y keypair override
     dev[0].set("dpp_pkex_ephemeral_key_override",
                p256_prefix + resp_y_priv + p256_postfix)
 
-    # Initiator bootstrapping key
-    id1 = dev[1].dpp_bootstrap_gen(type="pkex",
-                                   key=p256_prefix + init_priv + p256_postfix)
-
     # Initiator x/X keypair override
     dev[1].set("dpp_pkex_ephemeral_key_override",
                p256_prefix + init_x_priv + p256_postfix)
 
-    cmd = "DPP_PKEX_ADD own=%d identifier=%s code=%s" % (id0, identifier, code)
-    res = dev[0].request(cmd)
-    if "FAIL" in res:
-        raise Exception("Failed to set PKEX data (responder)")
-    dev[0].dpp_listen(2437)
-
-    cmd = "DPP_PKEX_ADD own=%d identifier=%s init=1 code=%s" % (id1, identifier, code)
-    res = dev[1].request(cmd)
-    if "FAIL" in res:
-        raise Exception("Failed to set PKEX data (initiator)")
+    dev[0].dpp_pkex_resp(2437, identifier=identifier, code=code,
+                         key=p256_prefix + resp_priv + p256_postfix)
+    dev[1].dpp_pkex_init(identifier=identifier, code=code,
+                         key=p256_prefix + init_priv + p256_postfix)
     wait_auth_success(dev[0], dev[1])
 
 def test_dpp_pkex_code_mismatch(dev, apdev):
     """DPP and PKEX with mismatching code"""
     check_dpp_capab(dev[0])
     check_dpp_capab(dev[1])
-
-    id0 = dev[0].dpp_bootstrap_gen(type="pkex")
-    id1 = dev[1].dpp_bootstrap_gen(type="pkex")
-
-    cmd = "DPP_PKEX_ADD own=%d identifier=test code=secret" % (id0)
-    res = dev[0].request(cmd)
-    if "FAIL" in res:
-        raise Exception("Failed to set PKEX data (responder)")
-    dev[0].dpp_listen(2437)
-
-    cmd = "DPP_PKEX_ADD own=%d identifier=test init=1 code=unknown" % id1
-    res = dev[1].request(cmd)
-    if "FAIL" in res:
-        raise Exception("Failed to set PKEX data (initiator)")
-
+    dev[0].dpp_pkex_resp(2437, identifier="test", code="secret")
+    id1 = dev[1].dpp_pkex_init(identifier="test", code="unknown")
     ev = dev[0].wait_event(["DPP-FAIL"], timeout=5)
     if ev is None:
         raise Exception("Failure not reported")
     if "possible PKEX code mismatch" not in ev:
         raise Exception("Unexpected result: " + ev)
-
     dev[0].dump_monitor()
     dev[1].dump_monitor()
-
-    cmd = "DPP_PKEX_ADD own=%d identifier=test init=1 code=secret" % id1
-    res = dev[1].request(cmd)
-    if "FAIL" in res:
-        raise Exception("Failed to set PKEX data (initiator, retry)")
+    dev[1].dpp_pkex_init(identifier="test", code="secret", use_id=id1)
     wait_auth_success(dev[0], dev[1])
 
 def test_dpp_pkex_code_mismatch_limit(dev, apdev):
     """DPP and PKEX with mismatching code limit"""
     check_dpp_capab(dev[0])
     check_dpp_capab(dev[1])
+    dev[0].dpp_pkex_resp(2437, identifier="test", code="secret")
 
-    id0 = dev[0].dpp_bootstrap_gen(type="pkex")
-    id1 = dev[1].dpp_bootstrap_gen(type="pkex")
-
-    cmd = "DPP_PKEX_ADD own=%d identifier=test code=secret" % (id0)
-    res = dev[0].request(cmd)
-    if "FAIL" in res:
-        raise Exception("Failed to set PKEX data (responder)")
-    dev[0].dpp_listen(2437)
-
+    id1 = None
     for i in range(5):
         dev[0].dump_monitor()
         dev[1].dump_monitor()
-        cmd = "DPP_PKEX_ADD own=%d identifier=test init=1 code=unknown" % id1
-        res = dev[1].request(cmd)
-        if "FAIL" in res:
-            raise Exception("Failed to set PKEX data (initiator)")
-
+        id1 = dev[1].dpp_pkex_init(identifier="test", code="unknown",
+                                   use_id=id1)
         ev = dev[0].wait_event(["DPP-FAIL"], timeout=5)
         if ev is None:
             raise Exception("Failure not reported")
@@ -2435,20 +2367,8 @@ def test_dpp_pkex_curve_mismatch(dev, apdev):
     """DPP and PKEX with mismatching curve"""
     check_dpp_capab(dev[0])
     check_dpp_capab(dev[1])
-
-    id0 = dev[0].dpp_bootstrap_gen(type="pkex", curve="P-256")
-    id1 = dev[1].dpp_bootstrap_gen(type="pkex", curve="P-384")
-
-    cmd = "DPP_PKEX_ADD own=%d identifier=test code=secret" % (id0)
-    res = dev[0].request(cmd)
-    if "FAIL" in res:
-        raise Exception("Failed to set PKEX data (responder)")
-    dev[0].dpp_listen(2437)
-
-    cmd = "DPP_PKEX_ADD own=%d identifier=test init=1 code=secret" % id1
-    res = dev[1].request(cmd)
-    if "FAIL" in res:
-        raise Exception("Failed to set PKEX data (initiator)")
+    dev[0].dpp_pkex_resp(2437, identifier="test", code="secret", curve="P-256")
+    dev[1].dpp_pkex_init(identifier="test", code="secret", curve="P-384")
 
     ev = dev[0].wait_event(["DPP-FAIL"], timeout=5)
     if ev is None:
@@ -2474,21 +2394,10 @@ def test_dpp_pkex_curve_mismatch_failure2(dev, apdev):
 def run_dpp_pkex_curve_mismatch_failure(dev, apdev, func):
     check_dpp_capab(dev[0])
     check_dpp_capab(dev[1])
-
-    id0 = dev[0].dpp_bootstrap_gen(type="pkex", curve="P-256")
-    id1 = dev[1].dpp_bootstrap_gen(type="pkex", curve="P-384")
-
-    cmd = "DPP_PKEX_ADD own=%d identifier=test code=secret" % (id0)
-    res = dev[0].request(cmd)
-    if "FAIL" in res:
-        raise Exception("Failed to set PKEX data (responder)")
-    dev[0].dpp_listen(2437)
+    dev[0].dpp_pkex_resp(2437, identifier="test", code="secret", curve="P-256")
 
     with alloc_fail(dev[0], 1, func):
-        cmd = "DPP_PKEX_ADD own=%d identifier=test init=1 code=secret" % id1
-        res = dev[1].request(cmd)
-        if "FAIL" in res:
-            raise Exception("Failed to set PKEX data (initiator)")
+        dev[1].dpp_pkex_init(identifier="test", code="secret", curve="P-384")
 
         ev = dev[0].wait_event(["DPP-FAIL"], timeout=5)
         if ev is None:
@@ -2500,43 +2409,21 @@ def test_dpp_pkex_exchange_resp_processing_failure(dev, apdev):
     """DPP and PKEX with local failure in processing Exchange Resp"""
     check_dpp_capab(dev[0])
     check_dpp_capab(dev[1])
-
-    id0 = dev[0].dpp_bootstrap_gen(type="pkex")
-    id1 = dev[1].dpp_bootstrap_gen(type="pkex")
-
-    cmd = "DPP_PKEX_ADD own=%d identifier=test code=secret" % (id0)
-    res = dev[0].request(cmd)
-    if "FAIL" in res:
-        raise Exception("Failed to set PKEX data (responder)")
-    dev[0].dpp_listen(2437)
+    dev[0].dpp_pkex_resp(2437, identifier="test", code="secret")
 
     with fail_test(dev[1], 1, "dpp_pkex_derive_Qr;dpp_pkex_rx_exchange_resp"):
-        cmd = "DPP_PKEX_ADD own=%d identifier=test init=1 code=secret" % id1
-        res = dev[1].request(cmd)
-        if "FAIL" in res:
-            raise Exception("Failed to set PKEX data (initiator)")
+        dev[1].dpp_pkex_init(identifier="test", code="secret")
         wait_fail_trigger(dev[1], "GET_FAIL")
 
 def test_dpp_pkex_commit_reveal_req_processing_failure(dev, apdev):
     """DPP and PKEX with local failure in processing Commit Reveal Req"""
     check_dpp_capab(dev[0])
     check_dpp_capab(dev[1])
-
-    id0 = dev[0].dpp_bootstrap_gen(type="pkex")
-    id1 = dev[1].dpp_bootstrap_gen(type="pkex")
-
-    cmd = "DPP_PKEX_ADD own=%d identifier=test code=secret" % (id0)
-    res = dev[0].request(cmd)
-    if "FAIL" in res:
-        raise Exception("Failed to set PKEX data (responder)")
-    dev[0].dpp_listen(2437)
+    dev[0].dpp_pkex_resp(2437, identifier="test", code="secret")
 
     with alloc_fail(dev[0], 1,
                     "dpp_get_pubkey_point;dpp_pkex_rx_commit_reveal_req"):
-        cmd = "DPP_PKEX_ADD own=%d identifier=test init=1 code=secret" % id1
-        res = dev[1].request(cmd)
-        if "FAIL" in res:
-            raise Exception("Failed to set PKEX data (initiator)")
+        dev[1].dpp_pkex_init(identifier="test", code="secret")
         wait_fail_trigger(dev[0], "GET_ALLOC_FAIL")
 
 def test_dpp_pkex_config2(dev, apdev):
@@ -2556,32 +2443,16 @@ def test_dpp_pkex_config2(dev, apdev):
 def run_dpp_pkex2(dev, apdev, curve=None, init_extra=""):
     check_dpp_capab(dev[0])
     check_dpp_capab(dev[1])
-
-    id0 = dev[0].dpp_bootstrap_gen(type="pkex", curve=curve)
-    id1 = dev[1].dpp_bootstrap_gen(type="pkex", curve=curve)
-
-    cmd = "DPP_PKEX_ADD own=%d identifier=test code=secret" % (id0)
-    res = dev[0].request(cmd)
-    if "FAIL" in res:
-        raise Exception("Failed to set PKEX data (responder)")
-    dev[0].dpp_listen(2437, role="configurator")
-
-    cmd = "DPP_PKEX_ADD own=%d identifier=test init=1 role=enrollee %s code=secret" % (id1, init_extra)
-    res = dev[1].request(cmd)
-    if "FAIL" in res:
-        raise Exception("Failed to set PKEX data (initiator)")
+    dev[0].dpp_pkex_resp(2437, identifier="test", code="secret", curve=curve,
+                         listen_role="configurator")
+    dev[1].dpp_pkex_init(identifier="test", code="secret", role="enrollee",
+                         curve=curve, extra=init_extra)
     wait_auth_success(dev[0], dev[1], configurator=dev[0], enrollee=dev[1])
 
 def test_dpp_pkex_no_responder(dev, apdev):
     """DPP and PKEX with no responder (retry behavior)"""
     check_dpp_capab(dev[0])
-
-    id0 = dev[0].dpp_bootstrap_gen(type="pkex")
-
-    cmd = "DPP_PKEX_ADD own=%d init=1 identifier=test code=secret" % (id0)
-    res = dev[0].request(cmd)
-    if "FAIL" in res:
-        raise Exception("Failed to set PKEX data (initiator)")
+    dev[0].dpp_pkex_init(identifier="test", code="secret")
 
     for i in range(15):
         ev = dev[0].wait_event(["DPP-TX ", "DPP-FAIL"], timeout=5)
@@ -2596,22 +2467,9 @@ def test_dpp_pkex_no_responder(dev, apdev):
 def test_dpp_pkex_after_retry(dev, apdev):
     """DPP and PKEX completing after retry"""
     check_dpp_capab(dev[0])
-
-    id0 = dev[0].dpp_bootstrap_gen(type="pkex")
-
-    cmd = "DPP_PKEX_ADD own=%d init=1 identifier=test code=secret" % (id0)
-    res = dev[0].request(cmd)
-    if "FAIL" in res:
-        raise Exception("Failed to set PKEX data (initiator)")
-
+    dev[0].dpp_pkex_init(identifier="test", code="secret")
     time.sleep(0.1)
-    id1 = dev[1].dpp_bootstrap_gen(type="pkex")
-
-    cmd = "DPP_PKEX_ADD own=%d identifier=test code=secret" % (id1)
-    res = dev[1].request(cmd)
-    if "FAIL" in res:
-        raise Exception("Failed to set PKEX data (responder)")
-    dev[1].dpp_listen(2437)
+    dev[1].dpp_pkex_resp(2437, identifier="test", code="secret")
     wait_auth_success(dev[1], dev[0], configurator=dev[0], enrollee=dev[1],
                       allow_enrollee_failure=True)
 
@@ -2621,26 +2479,15 @@ def test_dpp_pkex_hostapd_responder(dev, apdev):
     hapd = hostapd.add_ap(apdev[0], {"ssid": "unconfigured",
                                      "channel": "6"})
     check_dpp_capab(hapd)
-
-    id_h = hapd.dpp_bootstrap_gen(type="pkex")
-
-    cmd = "DPP_PKEX_ADD own=%d identifier=test code=secret" % (id_h)
-    res = hapd.request(cmd)
-    if "FAIL" in res:
-        raise Exception("Failed to set PKEX data (responder/hostapd)")
+    hapd.dpp_pkex_resp(2437, identifier="test", code="secret")
 
     cmd = "DPP_CONFIGURATOR_ADD"
     res = dev[0].request(cmd)
     if "FAIL" in res:
         raise Exception("Failed to add configurator")
     conf_id = int(res)
-
-    id0 = dev[0].dpp_bootstrap_gen(type="pkex")
-
-    cmd = "DPP_PKEX_ADD own=%d identifier=test init=1 conf=ap-dpp configurator=%d code=secret" % (id0, conf_id)
-    res = dev[0].request(cmd)
-    if "FAIL" in res:
-        raise Exception("Failed to set PKEX data (initiator/wpa_supplicant)")
+    dev[0].dpp_pkex_init(identifier="test", code="secret",
+                         extra="conf=ap-dpp configurator=%d" % conf_id)
     wait_auth_success(hapd, dev[0], configurator=dev[0], enrollee=hapd,
                       stop_initiator=True)
 
@@ -2657,23 +2504,11 @@ def test_dpp_pkex_hostapd_initiator(dev, apdev):
         raise Exception("Failed to add configurator")
     conf_id = int(res)
 
-    id0 = dev[0].dpp_bootstrap_gen(type="pkex")
-
     dev[0].set("dpp_configurator_params",
                " conf=ap-dpp configurator=%d" % conf_id)
-
-    cmd = "DPP_PKEX_ADD own=%d identifier=test code=secret" % (id0)
-    res = dev[0].request(cmd)
-    if "FAIL" in res:
-        raise Exception("Failed to set PKEX data (responder/wpa_supplicant)")
-
-    dev[0].dpp_listen(2437, role="configurator")
-    id_h = hapd.dpp_bootstrap_gen(type="pkex")
-
-    cmd = "DPP_PKEX_ADD own=%d identifier=test init=1 role=enrollee code=secret" % (id_h)
-    res = hapd.request(cmd)
-    if "FAIL" in res:
-        raise Exception("Failed to set PKEX data (initiator/hostapd)")
+    dev[0].dpp_pkex_resp(2437, identifier="test", code="secret",
+                         listen_role="configurator")
+    hapd.dpp_pkex_init(identifier="test", code="secret", role="enrollee")
     wait_auth_success(hapd, dev[0], configurator=dev[0], enrollee=hapd,
                       stop_initiator=True)
 
@@ -3597,20 +3432,8 @@ def run_dpp_proto_init_pkex(dev, test_dev, test):
     check_dpp_capab(dev[0])
     check_dpp_capab(dev[1])
     dev[test_dev].set("dpp_test", str(test))
-
-    id0 = dev[0].dpp_bootstrap_gen(type="pkex")
-    id1 = dev[1].dpp_bootstrap_gen(type="pkex")
-
-    cmd = "DPP_PKEX_ADD own=%d identifier=test code=secret" % (id0)
-    res = dev[0].request(cmd)
-    if "FAIL" in res:
-        raise Exception("Failed to set PKEX data (responder)")
-    dev[0].dpp_listen(2437)
-
-    cmd = "DPP_PKEX_ADD own=%d identifier=test init=1 code=secret" % id1
-    res = dev[1].request(cmd)
-    if "FAIL" in res:
-        raise Exception("Failed to set PKEX data (initiator)")
+    dev[0].dpp_pkex_resp(2437, identifier="test", code="secret")
+    dev[1].dpp_pkex_init(identifier="test", code="secret")
 
 def test_dpp_proto_after_wrapped_data_pkex_cr_req(dev, apdev):
     """DPP protocol testing - attribute after Wrapped Data in PKEX CR Req"""
@@ -3956,8 +3779,8 @@ def test_dpp_pkex_alloc_fail(dev, apdev):
         raise Exception("Failed to add configurator")
     conf_id = int(res)
 
-    id0 = dev[0].dpp_bootstrap_gen(type="pkex")
-    id1 = dev[1].dpp_bootstrap_gen(type="pkex")
+    id0 = None
+    id1 = None
 
     # Local error cases on the Initiator
     tests = [(1, "dpp_get_pubkey_point"),
@@ -3998,16 +3821,14 @@ def test_dpp_pkex_alloc_fail(dev, apdev):
         dev[1].request("DPP_STOP_LISTEN")
         dev[0].dump_monitor()
         dev[1].dump_monitor()
-
-        cmd = "DPP_PKEX_ADD own=%d identifier=test code=secret" % (id0)
-        res = dev[0].request(cmd)
-        if "FAIL" in res:
-            raise Exception("Failed to set PKEX data (responder)")
-        dev[0].dpp_listen(2437)
+        id0 = dev[0].dpp_pkex_resp(2437, identifier="test", code="secret",
+                                   use_id=id0)
 
         with alloc_fail(dev[1], count, func):
-            cmd = "DPP_PKEX_ADD own=%d identifier=test init=1 conf=sta-dpp configurator=%d code=secret" % (id1, conf_id)
-            dev[1].request(cmd)
+            id1 = dev[1].dpp_pkex_init(identifier="test", code="secret",
+                                       use_id=id1,
+                                       extra="conf=sta-dpp configurator=%d" % conf_id,
+                                       allow_fail=True)
             wait_fail_trigger(dev[1], "GET_ALLOC_FAIL", max_iter=100)
             ev = dev[0].wait_event(["GAS-QUERY-START"], timeout=0.01)
             if ev:
@@ -4050,16 +3871,13 @@ def test_dpp_pkex_alloc_fail(dev, apdev):
         dev[1].request("DPP_STOP_LISTEN")
         dev[0].dump_monitor()
         dev[1].dump_monitor()
-
-        cmd = "DPP_PKEX_ADD own=%d identifier=test code=secret" % (id0)
-        res = dev[0].request(cmd)
-        if "FAIL" in res:
-            raise Exception("Failed to set PKEX data (responder)")
-        dev[0].dpp_listen(2437)
+        id0 = dev[0].dpp_pkex_resp(2437, identifier="test", code="secret",
+                                   use_id=id0)
 
         with alloc_fail(dev[0], count, func):
-            cmd = "DPP_PKEX_ADD own=%d identifier=test init=1 conf=sta-dpp configurator=%d code=secret" % (id1, conf_id)
-            dev[1].request(cmd)
+            id1 = dev[1].dpp_pkex_init(identifier="test", code="secret",
+                                       use_id=id1,
+                                       extra="conf=sta-dpp configurator=%d" % conf_id)
             wait_fail_trigger(dev[0], "GET_ALLOC_FAIL", max_iter=100)
             ev = dev[0].wait_event(["GAS-QUERY-START"], timeout=0.01)
             if ev:
@@ -4093,8 +3911,8 @@ def test_dpp_pkex_test_fail(dev, apdev):
         raise Exception("Failed to add configurator")
     conf_id = int(res)
 
-    id0 = dev[0].dpp_bootstrap_gen(type="pkex")
-    id1 = dev[1].dpp_bootstrap_gen(type="pkex")
+    id0 = None
+    id1 = None
 
     # Local error cases on the Initiator
     tests = [(1, "aes_siv_encrypt;dpp_auth_build_req"),
@@ -4120,16 +3938,14 @@ def test_dpp_pkex_test_fail(dev, apdev):
         dev[1].request("DPP_STOP_LISTEN")
         dev[0].dump_monitor()
         dev[1].dump_monitor()
-
-        cmd = "DPP_PKEX_ADD own=%d identifier=test code=secret" % (id0)
-        res = dev[0].request(cmd)
-        if "FAIL" in res:
-            raise Exception("Failed to set PKEX data (responder)")
-        dev[0].dpp_listen(2437)
+        id0 = dev[0].dpp_pkex_resp(2437, identifier="test", code="secret",
+                                   use_id=id0)
 
         with fail_test(dev[1], count, func):
-            cmd = "DPP_PKEX_ADD own=%d identifier=test init=1 conf=sta-dpp configurator=%d code=secret" % (id1, conf_id)
-            dev[1].request(cmd)
+            id1 = dev[1].dpp_pkex_init(identifier="test", code="secret",
+                                       use_id=id1,
+                                       extra="conf=sta-dpp configurator=%d" % conf_id,
+                                       allow_fail=True)
             wait_fail_trigger(dev[1], "GET_FAIL", max_iter=100)
             ev = dev[0].wait_event(["GAS-QUERY-START"], timeout=0.01)
             if ev:
@@ -4164,16 +3980,13 @@ def test_dpp_pkex_test_fail(dev, apdev):
         dev[1].request("DPP_STOP_LISTEN")
         dev[0].dump_monitor()
         dev[1].dump_monitor()
-
-        cmd = "DPP_PKEX_ADD own=%d identifier=test code=secret" % (id0)
-        res = dev[0].request(cmd)
-        if "FAIL" in res:
-            raise Exception("Failed to set PKEX data (responder)")
-        dev[0].dpp_listen(2437)
+        id0 = dev[0].dpp_pkex_resp(2437, identifier="test", code="secret",
+                                   use_id=id0)
 
         with fail_test(dev[0], count, func):
-            cmd = "DPP_PKEX_ADD own=%d identifier=test init=1 conf=sta-dpp configurator=%d code=secret" % (id1, conf_id)
-            dev[1].request(cmd)
+            id1 = dev[1].dpp_pkex_init(identifier="test", code="secret",
+                                       use_id=id1,
+                                       extra="conf=sta-dpp configurator=%d" % conf_id)
             wait_fail_trigger(dev[0], "GET_FAIL", max_iter=100)
             ev = dev[0].wait_event(["GAS-QUERY-START"], timeout=0.01)
             if ev:
index f8ce5760374f1bcb1a735a92ac38d388675a5ebc..96d2d05329476adb546e8030230b5978ee3e6ca8 100644 (file)
@@ -1436,3 +1436,41 @@ class WpaSupplicant:
             cmd += " role=" + role
         if "OK" not in self.request(cmd):
             raise Exception("Failed to start listen operation")
+
+    def dpp_pkex_init(self, identifier, code, role=None, key=None, curve=None,
+                      extra=None, use_id=None, allow_fail=False):
+        if use_id is None:
+            id1 = self.dpp_bootstrap_gen(type="pkex", key=key, curve=curve)
+        else:
+            id1 = use_id
+        cmd = "own=%d " % id1
+        if identifier:
+            cmd += "identifier=%s " % identifier
+        cmd += "init=1 "
+        if role:
+            cmd += "role=%s " % role
+        if extra:
+            cmd += extra + " "
+        cmd += "code=%s" % code
+        res = self.request("DPP_PKEX_ADD " + cmd)
+        if allow_fail:
+            return id1
+        if "FAIL" in res:
+            raise Exception("Failed to set PKEX data (initiator)")
+        return id1
+
+    def dpp_pkex_resp(self, freq, identifier, code, key=None, curve=None,
+                      listen_role=None, use_id=None):
+        if use_id is None:
+            id0 = self.dpp_bootstrap_gen(type="pkex", key=key, curve=curve)
+        else:
+            id0 = use_id
+        cmd = "own=%d " % id0
+        if identifier:
+            cmd += "identifier=%s " % identifier
+        cmd += "code=%s" % code
+        res = self.request("DPP_PKEX_ADD " + cmd)
+        if "FAIL" in res:
+            raise Exception("Failed to set PKEX data (responder)")
+        self.dpp_listen(freq, role=listen_role)
+        return id0