]> git.ipfire.org Git - thirdparty/hostap.git/commitdiff
tests: Use a helper function for DPP_AUTH_INIT
authorJouni Malinen <jouni@codeaurora.org>
Mon, 18 Mar 2019 09:36:43 +0000 (11:36 +0200)
committerJouni Malinen <j@w1.fi>
Mon, 18 Mar 2019 16:32:31 +0000 (18:32 +0200)
Signed-off-by: Jouni Malinen <jouni@codeaurora.org>
tests/hwsim/hostapd.py
tests/hwsim/test_dpp.py
tests/hwsim/wpasupplicant.py

index 61db918d7f39373d6452d79200aeebfad1ed0407..5084571cd2240ffa06706998f6a89dd493f8dc10 100644 (file)
@@ -418,6 +418,37 @@ class Hostapd:
         if "OK" not in self.request(cmd):
             raise Exception("Failed to start listen operation")
 
+    def dpp_auth_init(self, peer=None, uri=None, conf=None, configurator=None,
+                      extra=None, own=None, role=None, neg_freq=None,
+                      ssid=None, passphrase=None, expect_fail=False):
+        cmd = "DPP_AUTH_INIT"
+        if peer is None:
+            peer = self.dpp_qr_code(uri)
+        cmd += " peer=%d" % peer
+        if own is not None:
+            cmd += " own=%d" % own
+        if role:
+            cmd += " role=" + role
+        if extra:
+            cmd += " " + extra
+        if conf:
+            cmd += " conf=" + conf
+        if configurator is not None:
+            cmd += " configurator=%d" % configurator
+        if neg_freq:
+            cmd += " neg_freq=%d" % neg_freq
+        if ssid:
+            cmd += " ssid=" + binascii.hexlify(ssid.encode()).decode()
+        if passphrase:
+            cmd += " pass=" + binascii.hexlify(passphrase.encode()).decode()
+        res = self.request(cmd)
+        if expect_fail:
+            if "FAIL" not in res:
+                raise Exception("DPP authentication started unexpectedly")
+            return
+        if "OK" not in res:
+            raise Exception("Failed to initiate DPP Authentication")
+
     def dpp_pkex_init(self, identifier, code, role=None, key=None, curve=None,
                       extra=None, use_id=None):
         if use_id is None:
index 5e0b86519b65815c8482c1c41d3faf43bb7e213c..9bd924446a21e6f3420ab9e06c454800827d6c06 100644 (file)
@@ -210,9 +210,7 @@ def test_dpp_qr_code_curve_select(dev, apdev):
         logger.info("URI: " + uri)
 
         dev[0].dpp_listen(2412)
-        res = dev[1].dpp_qr_code(uri)
-        if "OK" not in dev[1].request("DPP_AUTH_INIT peer=%d" % res):
-            raise Exception("Failed to initiate DPP Authentication")
+        dev[1].dpp_auth_init(uri=uri)
         wait_auth_success(dev[0], dev[1], configurator=dev[1], enrollee=dev[0],
                           allow_enrollee_failure=True, stop_responder=True,
                           stop_initiator=True)
@@ -224,14 +222,9 @@ def test_dpp_qr_code_auth_broadcast(dev, apdev):
     logger.info("dev0 displays QR Code")
     id0 = dev[0].dpp_bootstrap_gen(chan="81/1")
     uri0 = dev[0].request("DPP_BOOTSTRAP_GET_URI %d" % id0)
-
-    logger.info("dev1 scans QR Code")
-    id1 = dev[1].dpp_qr_code(uri0)
-
-    logger.info("dev1 initiates DPP Authentication")
+    logger.info("dev1 scans QR Code and initiates DPP Authentication")
     dev[0].dpp_listen(2412)
-    if "OK" not in dev[1].request("DPP_AUTH_INIT peer=%d" % id1):
-        raise Exception("Failed to initiate DPP Authentication")
+    dev[1].dpp_auth_init(uri=uri0)
     wait_auth_success(dev[0], dev[1], stop_responder=True)
 
 def test_dpp_qr_code_auth_unicast(dev, apdev):
@@ -285,23 +278,16 @@ def run_dpp_qr_code_auth_unicast(dev, apdev, curve, netrole=None, key=None,
         if "FAIL" in res:
             raise Exception("Failed to add configurator")
         conf_id = int(res)
+    else:
+        conf_id = None
 
     logger.info("dev0 displays QR Code")
     id0 = dev[0].dpp_bootstrap_gen(chan="81/1", mac=True, curve=curve, key=key)
     uri0 = dev[0].request("DPP_BOOTSTRAP_GET_URI %d" % id0)
 
-    logger.info("dev1 scans QR Code")
-    id1 = dev[1].dpp_qr_code(uri0)
-
-    logger.info("dev1 initiates DPP Authentication")
+    logger.info("dev1 scans QR Code and initiates DPP Authentication")
     dev[0].dpp_listen(2412, netrole=netrole)
-    cmd = "DPP_AUTH_INIT peer=%d" % id1
-    if init_extra:
-        cmd += " " + init_extra
-    if configurator:
-        cmd += " configurator=%d" % conf_id
-    if "OK" not in dev[1].request(cmd):
-        raise Exception("Failed to initiate DPP Authentication")
+    dev[1].dpp_auth_init(uri=uri0, extra=init_extra, configurator=conf_id)
     wait_auth_success(dev[0], dev[1], configurator=dev[1], enrollee=dev[0],
                       allow_enrollee_failure=True,
                       allow_configurator_failure=not require_conf_success,
@@ -317,9 +303,6 @@ def test_dpp_qr_code_auth_mutual(dev, apdev):
     id0 = dev[0].dpp_bootstrap_gen(chan="81/1", mac=True)
     uri0 = dev[0].request("DPP_BOOTSTRAP_GET_URI %d" % id0)
 
-    logger.info("dev1 scans QR Code")
-    id1 = dev[1].dpp_qr_code(uri0)
-
     logger.info("dev1 displays QR Code")
     id1b = dev[1].dpp_bootstrap_gen(chan="81/1", mac=True)
     uri1b = dev[1].request("DPP_BOOTSTRAP_GET_URI %d" % id1b)
@@ -327,10 +310,9 @@ def test_dpp_qr_code_auth_mutual(dev, apdev):
     logger.info("dev0 scans QR Code")
     id0b = dev[0].dpp_qr_code(uri1b)
 
-    logger.info("dev1 initiates DPP Authentication")
+    logger.info("dev1 scans QR Code and initiates DPP Authentication")
     dev[0].dpp_listen(2412)
-    if "OK" not in dev[1].request("DPP_AUTH_INIT peer=%d own=%d" % (id1, id1b)):
-        raise Exception("Failed to initiate DPP Authentication")
+    dev[1].dpp_auth_init(uri=uri0, own=id1b)
 
     ev = dev[1].wait_event(["DPP-AUTH-DIRECTION"], timeout=5)
     if ev is None:
@@ -348,17 +330,13 @@ def test_dpp_qr_code_auth_mutual2(dev, apdev):
     id0 = dev[0].dpp_bootstrap_gen(chan="81/1", mac=True)
     uri0 = dev[0].request("DPP_BOOTSTRAP_GET_URI %d" % id0)
 
-    logger.info("dev1 scans QR Code")
-    id1 = dev[1].dpp_qr_code(uri0)
-
     logger.info("dev1 displays QR Code")
     id1b = dev[1].dpp_bootstrap_gen(chan="81/1", mac=True)
     uri1b = dev[1].request("DPP_BOOTSTRAP_GET_URI %d" % id1b)
 
-    logger.info("dev1 initiates DPP Authentication")
+    logger.info("dev1 scans QR Code and initiates DPP Authentication")
     dev[0].dpp_listen(2412, qr="mutual")
-    if "OK" not in dev[1].request("DPP_AUTH_INIT peer=%d own=%d" % (id1, id1b)):
-        raise Exception("Failed to initiate DPP Authentication")
+    dev[1].dpp_auth_init(uri=uri0, own=id1b)
 
     ev = dev[1].wait_event(["DPP-RESPONSE-PENDING"], timeout=5)
     if ev is None:
@@ -408,14 +386,9 @@ def run_dpp_qr_code_auth_mutual(dev, apdev, curve):
     logger.info("dev0 displays QR Code")
     id0 = dev[0].dpp_bootstrap_gen(chan="81/1", mac=True, curve=curve)
     uri0 = dev[0].request("DPP_BOOTSTRAP_GET_URI %d" % id0)
-
-    logger.info("dev1 scans QR Code")
-    id1 = dev[1].dpp_qr_code(uri0)
-
-    logger.info("dev1 initiates DPP Authentication")
+    logger.info("dev1 scans QR Code and initiates DPP Authentication")
     dev[0].dpp_listen(2412, qr="mutual")
-    if "OK" not in dev[1].request("DPP_AUTH_INIT peer=%d" % (id1)):
-        raise Exception("Failed to initiate DPP Authentication")
+    dev[1].dpp_auth_init(uri=uri0)
 
     ev = dev[1].wait_event(["DPP-RESPONSE-PENDING"], timeout=5)
     if ev is None:
@@ -447,18 +420,12 @@ def test_dpp_auth_resp_retries(dev, apdev):
     logger.info("dev0 displays QR Code")
     id0 = dev[0].dpp_bootstrap_gen(chan="81/1", mac=True)
     uri0 = dev[0].request("DPP_BOOTSTRAP_GET_URI %d" % id0)
-
-    logger.info("dev1 scans QR Code")
-    id1 = dev[1].dpp_qr_code(uri0)
-
     logger.info("dev1 displays QR Code")
     id1b = dev[1].dpp_bootstrap_gen(chan="81/1", mac=True)
     uri1b = dev[1].request("DPP_BOOTSTRAP_GET_URI %d" % id1b)
-
-    logger.info("dev1 initiates DPP Authentication")
+    logger.info("dev1 scans QR Code and initiates DPP Authentication")
     dev[0].dpp_listen(2412, qr="mutual")
-    if "OK" not in dev[1].request("DPP_AUTH_INIT peer=%d own=%d" % (id1, id1b)):
-        raise Exception("Failed to initiate DPP Authentication")
+    dev[1].dpp_auth_init(uri=uri0, own=id1b)
 
     ev = dev[1].wait_event(["DPP-RESPONSE-PENDING"], timeout=5)
     if ev is None:
@@ -497,20 +464,13 @@ def test_dpp_qr_code_auth_mutual_not_used(dev, apdev):
     logger.info("dev0 displays QR Code")
     id0 = dev[0].dpp_bootstrap_gen(chan="81/1", mac=True)
     uri0 = dev[0].request("DPP_BOOTSTRAP_GET_URI %d" % id0)
-
-    logger.info("dev1 scans QR Code")
-    id1 = dev[1].dpp_qr_code(uri0)
-
     logger.info("dev1 displays QR Code")
     id1b = dev[1].dpp_bootstrap_gen(chan="81/1", mac=True)
     uri1b = dev[1].request("DPP_BOOTSTRAP_GET_URI %d" % id1b)
-
     logger.info("dev0 does not scan QR Code")
-
-    logger.info("dev1 initiates DPP Authentication")
+    logger.info("dev1 scans QR Code and initiates DPP Authentication")
     dev[0].dpp_listen(2412)
-    if "OK" not in dev[1].request("DPP_AUTH_INIT peer=%d own=%d" % (id1, id1b)):
-        raise Exception("Failed to initiate DPP Authentication")
+    dev[1].dpp_auth_init(uri=uri0, own=id1b)
 
     ev = dev[1].wait_event(["DPP-AUTH-DIRECTION"], timeout=5)
     if ev is None:
@@ -527,42 +487,28 @@ def test_dpp_qr_code_auth_mutual_curve_mismatch(dev, apdev):
     logger.info("dev0 displays QR Code")
     id0 = dev[0].dpp_bootstrap_gen(chan="81/1", mac=True)
     uri0 = dev[0].request("DPP_BOOTSTRAP_GET_URI %d" % id0)
-
-    logger.info("dev1 scans QR Code")
-    id1 = dev[1].dpp_qr_code(uri0)
-
     logger.info("dev1 displays QR Code")
     id1b = dev[1].dpp_bootstrap_gen(chan="81/1", mac=True, curve="secp384r1")
     uri1b = dev[1].request("DPP_BOOTSTRAP_GET_URI %d" % id1b)
-
     logger.info("dev0 scans QR Code")
     id0b = dev[0].dpp_qr_code(uri1b)
-
-    res = dev[1].request("DPP_AUTH_INIT peer=%d own=%d" % (id1, id1b))
-    if "FAIL" not in res:
-        raise Exception("DPP_AUTH_INIT accepted unexpectedly")
+    logger.info("dev1 scans QR Code")
+    dev[1].dpp_auth_init(uri=uri0, own=id1b, expect_fail=True)
 
 def test_dpp_qr_code_auth_hostapd_mutual2(dev, apdev):
     """DPP QR Code and authentication exchange (hostapd mutual2)"""
     check_dpp_capab(dev[0])
     hapd = hostapd.add_ap(apdev[0], {"ssid": "unconfigured"})
     check_dpp_capab(hapd)
-
     logger.info("AP displays QR Code")
     id_h = hapd.dpp_bootstrap_gen(chan="81/1", mac=True)
     uri_h = hapd.request("DPP_BOOTSTRAP_GET_URI %d" % id_h)
-
-    logger.info("dev0 scans QR Code")
-    id0 = dev[0].dpp_qr_code(uri_h)
-
     logger.info("dev0 displays QR Code")
     id0b = dev[0].dpp_bootstrap_gen(chan="81/1", mac=True)
     uri0 = dev[0].request("DPP_BOOTSTRAP_GET_URI %d" % id0b)
-
-    logger.info("dev0 initiates DPP Authentication")
+    logger.info("dev0 scans QR Code and initiates DPP Authentication")
     hapd.dpp_listen(2412, qr="mutual")
-    if "OK" not in dev[0].request("DPP_AUTH_INIT peer=%d own=%d" % (id0, id0b)):
-        raise Exception("Failed to initiate DPP Authentication")
+    dev[0].dpp_auth_init(uri=uri_h, own=id0b)
 
     ev = dev[0].wait_event(["DPP-RESPONSE-PENDING"], timeout=5)
     if ev is None:
@@ -583,16 +529,11 @@ def test_dpp_qr_code_listen_continue(dev, apdev):
     logger.info("dev0 displays QR Code")
     id0 = dev[0].dpp_bootstrap_gen(chan="81/1", mac=True)
     uri0 = dev[0].request("DPP_BOOTSTRAP_GET_URI %d" % id0)
-
-    logger.info("dev1 scans QR Code")
-    id1 = dev[1].dpp_qr_code(uri0)
-
     dev[0].dpp_listen(2412)
     logger.info("Wait for listen to expire and get restarted")
     time.sleep(5.5)
-    logger.info("dev1 initiates DPP Authentication")
-    if "OK" not in dev[1].request("DPP_AUTH_INIT peer=%d" % id1):
-        raise Exception("Failed to initiate DPP Authentication")
+    logger.info("dev1 scans QR Code and initiates DPP Authentication")
+    dev[1].dpp_auth_init(uri=uri0)
     wait_auth_success(dev[0], dev[1], stop_responder=True)
 
 def test_dpp_qr_code_auth_initiator_enrollee(dev, apdev):
@@ -611,14 +552,9 @@ def run_dpp_qr_code_auth_initiator_enrollee(dev, apdev):
     logger.info("dev0 displays QR Code")
     id0 = dev[0].dpp_bootstrap_gen(chan="81/1", mac=True)
     uri0 = dev[0].request("DPP_BOOTSTRAP_GET_URI %d" % id0)
-
-    logger.info("dev1 scans QR Code")
-    id1 = dev[1].dpp_qr_code(uri0)
-
-    logger.info("dev1 initiates DPP Authentication")
+    logger.info("dev1 scans QR Code and initiates DPP Authentication")
     dev[0].dpp_listen(2412)
-    if "OK" not in dev[1].request("DPP_AUTH_INIT peer=%d role=enrollee" % id1):
-        raise Exception("Failed to initiate DPP Authentication")
+    dev[1].dpp_auth_init(uri=uri0, role="enrollee")
     wait_auth_success(dev[0], dev[1], configurator=dev[0], enrollee=dev[1],
                       allow_enrollee_failure=True, stop_responder=True)
 
@@ -643,14 +579,9 @@ def run_dpp_qr_code_auth_initiator_either(dev, apdev, resp_role,
     logger.info("dev0 displays QR Code")
     id0 = dev[0].dpp_bootstrap_gen(chan="81/1", mac=True)
     uri0 = dev[0].request("DPP_BOOTSTRAP_GET_URI %d" % id0)
-
-    logger.info("dev1 scans QR Code")
-    id1 = dev[1].dpp_qr_code(uri0)
-
-    logger.info("dev1 initiates DPP Authentication")
+    logger.info("dev1 scans QR Code and initiates DPP Authentication")
     dev[0].dpp_listen(2412, role=resp_role)
-    if "OK" not in dev[1].request("DPP_AUTH_INIT peer=%d role=either" % id1):
-        raise Exception("Failed to initiate DPP Authentication")
+    dev[1].dpp_auth_init(uri=uri0, role="either")
     wait_auth_success(dev[0], dev[1], configurator=conf_dev,
                       enrollee=enrollee_dev, allow_enrollee_failure=True,
                       stop_responder=True)
@@ -672,24 +603,20 @@ def run_init_incompatible_roles(dev, role="enrollee"):
 def test_dpp_qr_code_auth_incompatible_roles(dev, apdev):
     """DPP QR Code and authentication exchange (incompatible roles)"""
     id1 = run_init_incompatible_roles(dev)
-    if "OK" not in dev[1].request("DPP_AUTH_INIT peer=%d role=enrollee" % id1):
-        raise Exception("Failed to initiate DPP Authentication")
+    dev[1].dpp_auth_init(peer=id1, role="enrollee")
     ev = dev[1].wait_event(["DPP-NOT-COMPATIBLE"], timeout=5)
     if ev is None:
         raise Exception("DPP-NOT-COMPATIBLE event on initiator timed out")
     ev = dev[0].wait_event(["DPP-NOT-COMPATIBLE"], timeout=1)
     if ev is None:
         raise Exception("DPP-NOT-COMPATIBLE event on responder timed out")
-
-    if "OK" not in dev[1].request("DPP_AUTH_INIT peer=%d role=configurator" % id1):
-        raise Exception("Failed to initiate DPP Authentication")
+    dev[1].dpp_auth_init(peer=id1, role="configurator")
     wait_auth_success(dev[0], dev[1], stop_responder=True)
 
 def test_dpp_qr_code_auth_incompatible_roles2(dev, apdev):
     """DPP QR Code and authentication exchange (incompatible roles 2)"""
     id1 = run_init_incompatible_roles(dev, role="configurator")
-    if "OK" not in dev[1].request("DPP_AUTH_INIT peer=%d role=configurator" % id1):
-        raise Exception("Failed to initiate DPP Authentication")
+    dev[1].dpp_auth_init(peer=id1, role="configurator")
     ev = dev[1].wait_event(["DPP-NOT-COMPATIBLE"], timeout=5)
     if ev is None:
         raise Exception("DPP-NOT-COMPATIBLE event on initiator timed out")
@@ -701,8 +628,7 @@ def test_dpp_qr_code_auth_incompatible_roles_failure(dev, apdev):
     """DPP QR Code and authentication exchange (incompatible roles failure)"""
     id1 = run_init_incompatible_roles(dev, role="configurator")
     with alloc_fail(dev[0], 1, "dpp_auth_build_resp_status"):
-        if "OK" not in dev[1].request("DPP_AUTH_INIT peer=%d role=configurator" % id1):
-            raise Exception("Failed to initiate DPP Authentication")
+        dev[1].dpp_auth_init(peer=id1, role="configurator")
         ev = dev[0].wait_event(["DPP-NOT-COMPATIBLE"], timeout=1)
         if ev is None:
             raise Exception("DPP-NOT-COMPATIBLE event on responder timed out")
@@ -711,16 +637,14 @@ def test_dpp_qr_code_auth_incompatible_roles_failure2(dev, apdev):
     """DPP QR Code and authentication exchange (incompatible roles failure 2)"""
     id1 = run_init_incompatible_roles(dev, role="configurator")
     with alloc_fail(dev[1], 1, "dpp_auth_resp_rx_status"):
-        if "OK" not in dev[1].request("DPP_AUTH_INIT peer=%d role=configurator" % id1):
-            raise Exception("Failed to initiate DPP Authentication")
+        dev[1].dpp_auth_init(peer=id1, role="configurator")
         wait_fail_trigger(dev[1], "GET_ALLOC_FAIL")
 
 def test_dpp_qr_code_auth_incompatible_roles_failure3(dev, apdev):
     """DPP QR Code and authentication exchange (incompatible roles failure 3)"""
     id1 = run_init_incompatible_roles(dev, role="configurator")
     with fail_test(dev[1], 1, "dpp_auth_resp_rx_status"):
-        if "OK" not in dev[1].request("DPP_AUTH_INIT peer=%d role=configurator" % id1):
-            raise Exception("Failed to initiate DPP Authentication")
+        dev[1].dpp_auth_init(peer=id1, role="configurator")
         wait_dpp_fail(dev[1], "AES-SIV decryption failed")
 
 def test_dpp_qr_code_auth_neg_chan(dev, apdev):
@@ -738,15 +662,10 @@ def test_dpp_qr_code_auth_neg_chan(dev, apdev):
     logger.info("dev0 displays QR Code")
     id0 = dev[0].dpp_bootstrap_gen(chan="81/1", mac=True)
     uri0 = dev[0].request("DPP_BOOTSTRAP_GET_URI %d" % id0)
-
-    logger.info("dev1 scans QR Code")
-    id1 = dev[1].dpp_qr_code(uri0)
-
-    logger.info("dev1 initiates DPP Authentication")
+    logger.info("dev1 scans QR Code and initiates DPP Authentication")
     dev[0].dpp_listen(2412)
-    cmd = "DPP_AUTH_INIT peer=%d configurator=%d conf=sta-dpp neg_freq=2462" % (id1, conf_id)
-    if "OK" not in dev[1].request(cmd):
-        raise Exception("Failed to initiate DPP Authentication")
+    dev[1].dpp_auth_init(uri=uri0, conf="sta-dpp", neg_freq=2462,
+                         configurator=conf_id)
 
     ev = dev[1].wait_event(["DPP-TX"], timeout=5)
     if ev is None:
@@ -1393,10 +1312,7 @@ def test_dpp_gas_timeout(dev, apdev):
     id0 = dev[0].dpp_bootstrap_gen(chan="81/1", mac=True)
     uri0 = dev[0].request("DPP_BOOTSTRAP_GET_URI %d" % id0)
 
-    logger.info("dev1 scans QR Code")
-    id1 = dev[1].dpp_qr_code(uri0)
-
-    logger.info("dev1 initiates DPP Authentication")
+    logger.info("dev1 scans QR Code and initiates DPP Authentication")
     dev[0].set("ext_mgmt_frame_handling", "1")
     dev[0].dpp_listen(2412)
 
@@ -1404,9 +1320,7 @@ def test_dpp_gas_timeout(dev, apdev):
     conf = '{"wi-fi_tech":"infra", "discovery":{"ssid":"test"},"cred":{"akm":"psk","pass":"secret passphrase"}}' + 3000*' '
     dev[1].set("dpp_config_obj_override", conf)
 
-    cmd = "DPP_AUTH_INIT peer=%d" % id1
-    if "OK" not in dev[1].request(cmd):
-        raise Exception("Failed to initiate DPP Authentication")
+    dev[1].dpp_auth_init(uri=uri0)
 
     # DPP Authentication Request
     msg = dev[0].mgmt_rx()
@@ -1709,19 +1623,13 @@ def run_dpp_ap_config(dev, apdev, curve=None, conf_curve=None,
         if "FAIL" in csign or len(csign) == 0:
             raise Exception("DPP_CONFIGURATOR_GET_KEY failed")
 
-    id = dev[0].dpp_qr_code(uri)
-
-    cmd = "DPP_AUTH_INIT peer=%d conf=ap-dpp configurator=%d" % (id, conf_id)
-    if "OK" not in dev[0].request(cmd):
-        raise Exception("Failed to initiate DPP Authentication")
+    dev[0].dpp_auth_init(uri=uri, conf="ap-dpp", configurator=conf_id)
     wait_auth_success(hapd, dev[0], configurator=dev[0], enrollee=hapd)
     update_hapd_config(hapd)
 
     id1 = dev[1].dpp_bootstrap_gen(chan="81/1", mac=True, curve=curve)
     uri1 = dev[1].request("DPP_BOOTSTRAP_GET_URI %d" % id1)
 
-    id0b = dev[0].dpp_qr_code(uri1)
-
     if reconf_configurator:
         res = dev[0].request("DPP_CONFIGURATOR_REMOVE %d" % conf_id)
         if "OK" not in res:
@@ -1736,9 +1644,7 @@ def run_dpp_ap_config(dev, apdev, curve=None, conf_curve=None,
         conf_id = int(res)
 
     dev[1].dpp_listen(2412)
-    cmd = "DPP_AUTH_INIT peer=%d conf=sta-dpp configurator=%d" % (id0b, conf_id)
-    if "OK" not in dev[0].request(cmd):
-        raise Exception("Failed to initiate DPP Authentication")
+    dev[0].dpp_auth_init(uri=uri1, conf="sta-dpp", configurator=conf_id)
     wait_auth_success(dev[1], dev[0], configurator=dev[0], enrollee=dev[1],
                       stop_responder=True)
 
@@ -1834,19 +1740,9 @@ def run_dpp_auto_connect(dev, apdev, processing):
     dev[0].set("dpp_config_processing", str(processing))
     id0 = dev[0].dpp_bootstrap_gen(chan="81/1", mac=True)
     uri0 = dev[0].request("DPP_BOOTSTRAP_GET_URI %d" % id0)
-
-    id1 = dev[1].dpp_qr_code(uri0)
-
     dev[0].dpp_listen(2412)
-    cmd = "DPP_AUTH_INIT peer=%d conf=sta-dpp configurator=%d" % (id1, conf_id)
-    if "OK" not in dev[1].request(cmd):
-        raise Exception("Failed to initiate DPP Authentication")
-    ev = dev[1].wait_event(["DPP-CONF-SENT"], timeout=10)
-    if ev is None:
-        raise Exception("DPP configuration not completed (Configurator)")
-    ev = dev[0].wait_event(["DPP-CONF-RECEIVED"], timeout=2)
-    if ev is None:
-        raise Exception("DPP configuration not completed (Enrollee)")
+    dev[1].dpp_auth_init(uri=uri0, conf="sta-dpp", configurator=conf_id)
+    wait_auth_success(dev[0], dev[1], configurator=dev[1], enrollee=dev[0])
     ev = dev[0].wait_event(["DPP-NETWORK-ID"], timeout=1)
     if ev is None:
         raise Exception("DPP network profile not generated")
@@ -1923,20 +1819,10 @@ def run_dpp_auto_connect_legacy(dev, apdev, conf='sta-psk',
     id0 = dev[0].dpp_bootstrap_gen(chan="81/1", mac=True)
     uri0 = dev[0].request("DPP_BOOTSTRAP_GET_URI %d" % id0)
 
-    id1 = dev[1].dpp_qr_code(uri0)
-
     dev[0].dpp_listen(2412)
-    cmd = "DPP_AUTH_INIT peer=%d conf=%s ssid=%s pass=%s" % (id1, conf,
-        binascii.hexlify(b"dpp-legacy").decode(),
-        binascii.hexlify(b"secret passphrase").decode())
-    if "OK" not in dev[1].request(cmd):
-        raise Exception("Failed to initiate DPP Authentication")
-    ev = dev[1].wait_event(["DPP-CONF-SENT"], timeout=10)
-    if ev is None:
-        raise Exception("DPP configuration not completed (Configurator)")
-    ev = dev[0].wait_event(["DPP-CONF-RECEIVED"], timeout=2)
-    if ev is None:
-        raise Exception("DPP configuration not completed (Enrollee)")
+    dev[1].dpp_auth_init(uri=uri0, conf=conf, ssid="dpp-legacy",
+                         passphrase="secret passphrase")
+    wait_auth_success(dev[0], dev[1], configurator=dev[1], enrollee=dev[0])
     ev = dev[0].wait_event(["DPP-NETWORK-ID"], timeout=1)
     if ev is None:
         raise Exception("DPP network profile not generated")
@@ -1964,26 +1850,13 @@ def run_dpp_auto_connect_legacy_pmf_required(dev, apdev):
     dev[0].set("dpp_config_processing", "2")
     id0 = dev[0].dpp_bootstrap_gen(chan="81/1", mac=True)
     uri0 = dev[0].request("DPP_BOOTSTRAP_GET_URI %d" % id0)
-
-    id1 = dev[1].dpp_qr_code(uri0)
-
     dev[0].dpp_listen(2412)
-    cmd = "DPP_AUTH_INIT peer=%d conf=sta-psk ssid=%s pass=%s" % (id1,
-        binascii.hexlify(b"dpp-legacy").decode(),
-        binascii.hexlify(b"secret passphrase").decode())
-    if "OK" not in dev[1].request(cmd):
-        raise Exception("Failed to initiate DPP Authentication")
-    ev = dev[1].wait_event(["DPP-CONF-SENT"], timeout=10)
-    if ev is None:
-        raise Exception("DPP configuration not completed (Configurator)")
-    ev = dev[0].wait_event(["DPP-CONF-RECEIVED"], timeout=2)
-    if ev is None:
-        raise Exception("DPP configuration not completed (Enrollee)")
+    dev[1].dpp_auth_init(uri=uri0, conf="sta-psk", ssid="dpp-legacy",
+                         passphrase="secret passphrase")
+    wait_auth_success(dev[0], dev[1], configurator=dev[1], enrollee=dev[0])
     ev = dev[0].wait_event(["DPP-NETWORK-ID"], timeout=1)
     if ev is None:
         raise Exception("DPP network profile not generated")
-    id = ev.split(' ')[1]
-
     dev[0].wait_connected()
 
 def test_dpp_qr_code_auth_responder_configurator(dev, apdev):
@@ -2006,15 +1879,10 @@ def run_dpp_qr_code_auth_responder_configurator(dev, apdev, extra):
 
     id0 = dev[0].dpp_bootstrap_gen(chan="81/1", mac=True)
     uri0 = dev[0].request("DPP_BOOTSTRAP_GET_URI %d" % id0)
-
-    id1 = dev[1].dpp_qr_code(uri0)
-
     dev[0].set("dpp_configurator_params",
                " conf=sta-dpp configurator=%d%s" % (conf_id, extra))
     dev[0].dpp_listen(2412, role="configurator")
-    cmd = "DPP_AUTH_INIT peer=%d role=enrollee" % id1
-    if "OK" not in dev[1].request(cmd):
-        raise Exception("Failed to initiate DPP Authentication")
+    dev[1].dpp_auth_init(uri=uri0, role="enrollee")
     wait_auth_success(dev[0], dev[1], configurator=dev[0], enrollee=dev[1],
                       stop_responder=True)
     dev[1].dump_monitor()
@@ -2034,15 +1902,10 @@ def test_dpp_qr_code_hostapd_init(dev, apdev):
 
     id0 = dev[0].dpp_bootstrap_gen(chan="81/6", mac=True)
     uri0 = dev[0].request("DPP_BOOTSTRAP_GET_URI %d" % id0)
-
     dev[0].set("dpp_configurator_params",
                " conf=ap-dpp configurator=%d" % conf_id)
     dev[0].dpp_listen(2437, role="configurator")
-    id1 = hapd.dpp_qr_code(uri0)
-
-    cmd = "DPP_AUTH_INIT peer=%d role=enrollee" % id1
-    if "OK" not in hapd.request(cmd):
-        raise Exception("Failed to initiate DPP Authentication")
+    hapd.dpp_auth_init(uri=uri0, role="enrollee")
     wait_auth_success(dev[0], hapd, configurator=dev[0], enrollee=hapd,
                       stop_responder=True)
 
@@ -2072,13 +1935,7 @@ def run_dpp_qr_code_hostapd_init_offchannel(dev, apdev, extra):
     dev[0].set("dpp_configurator_params",
                " conf=ap-dpp configurator=%d" % conf_id)
     dev[0].dpp_listen(2462, role="configurator")
-    id1 = hapd.dpp_qr_code(uri0)
-
-    cmd = "DPP_AUTH_INIT peer=%d role=enrollee" % id1
-    if extra:
-        cmd += " " + extra
-    if "OK" not in hapd.request(cmd):
-        raise Exception("Failed to initiate DPP Authentication")
+    hapd.dpp_auth_init(uri=uri0, role="enrollee", extra=extra)
     wait_auth_success(dev[0], hapd, configurator=dev[0], enrollee=hapd,
                       stop_responder=True)
 
@@ -2111,13 +1968,9 @@ def test_dpp_test_vector_p_256(dev, apdev):
 
     dev[1].set("dpp_nonce_override", "13f4602a16daeb69712263b9c46cba31")
 
-    id1peer = dev[1].dpp_qr_code(uri0)
-    id0peer = dev[0].dpp_qr_code(uri1)
-
+    dev[0].dpp_qr_code(uri1)
     dev[0].dpp_listen(2462, qr="mutual")
-    cmd = "DPP_AUTH_INIT peer=%d own=%d neg_freq=2412" % (id1peer, id1)
-    if "OK" not in dev[1].request(cmd):
-        raise Exception("Failed to initiate operation")
+    dev[1].dpp_auth_init(uri=uri0, own=id1, neg_freq=2412)
     wait_auth_success(dev[0], dev[1])
 
 def test_dpp_test_vector_p_256_b(dev, apdev):
@@ -2149,12 +2002,8 @@ def test_dpp_test_vector_p_256_b(dev, apdev):
 
     dev[1].set("dpp_nonce_override", "13f4602a16daeb69712263b9c46cba31")
 
-    id1peer = dev[1].dpp_qr_code(uri0)
-
     dev[0].dpp_listen(2462)
-    cmd = "DPP_AUTH_INIT peer=%d own=%d neg_freq=2412" % (id1peer, id1)
-    if "OK" not in dev[1].request(cmd):
-        raise Exception("Failed to initiate operation")
+    dev[1].dpp_auth_init(uri=uri0, own=id1, neg_freq=2412)
     wait_auth_success(dev[0], dev[1])
 
 def der_priv_key_p_521(priv):
@@ -2194,13 +2043,9 @@ def test_dpp_test_vector_p_521(dev, apdev):
     dev[1].set("dpp_nonce_override",
                "de972af3847bec3ba2aedd9f5c21cfdec7bf0bc5fe8b276cbcd0267807fb15b0")
 
-    id1peer = dev[1].dpp_qr_code(uri0)
-    id0peer = dev[0].dpp_qr_code(uri1)
-
+    dev[0].dpp_qr_code(uri1)
     dev[0].dpp_listen(2462, qr="mutual")
-    cmd = "DPP_AUTH_INIT peer=%d own=%d neg_freq=2412" % (id1peer, id1)
-    if "OK" not in dev[1].request(cmd):
-        raise Exception("Failed to initiate operation")
+    dev[1].dpp_auth_init(uri=uri0, own=id1, neg_freq=2412)
     wait_auth_success(dev[0], dev[1])
 
 def test_dpp_pkex(dev, apdev):
@@ -2545,8 +2390,7 @@ def test_dpp_hostapd_configurator(dev, apdev):
     uri0 = dev[0].request("DPP_BOOTSTRAP_GET_URI %d" % id0)
 
     id1 = hapd.dpp_qr_code(uri0)
-
-    res = hapd.request("DPP_BOOTSTRAP_INFO %d" % id0)
+    res = hapd.request("DPP_BOOTSTRAP_INFO %d" % id1)
     if "FAIL" in res:
         raise Exception("DPP_BOOTSTRAP_INFO failed")
     if "type=QRCODE" not in res:
@@ -2555,9 +2399,7 @@ def test_dpp_hostapd_configurator(dev, apdev):
         raise Exception("DPP_BOOTSTRAP_INFO did not report correct mac_addr")
 
     dev[0].dpp_listen(2412)
-    cmd = "DPP_AUTH_INIT peer=%d configurator=%d conf=sta-dpp" % (id1, conf_id)
-    if "OK" not in hapd.request(cmd):
-        raise Exception("Failed to initiate DPP Authentication")
+    hapd.dpp_auth_init(peer=id1, configurator=conf_id, conf="sta-dpp")
     wait_auth_success(dev[0], hapd, configurator=hapd, enrollee=dev[0],
                       stop_responder=True)
 
@@ -2579,12 +2421,7 @@ def test_dpp_hostapd_configurator_responder(dev, apdev):
 
     id0 = hapd.dpp_bootstrap_gen(chan="81/1", mac=True)
     uri0 = hapd.request("DPP_BOOTSTRAP_GET_URI %d" % id0)
-
-    id1 = dev[0].dpp_qr_code(uri0)
-
-    cmd = "DPP_AUTH_INIT peer=%d role=enrollee" % (id1)
-    if "OK" not in dev[0].request(cmd):
-        raise Exception("Failed to initiate DPP Authentication")
+    dev[0].dpp_auth_init(uri=uri0, role="enrollee")
     wait_auth_success(hapd, dev[0], configurator=hapd, enrollee=dev[0],
                       stop_initiator=True)
 
@@ -2610,7 +2447,7 @@ def test_dpp_own_config_curve_mismatch(dev, apdev):
         dev[0].set("dpp_config_processing", "0")
 
 def run_dpp_own_config(dev, apdev, own_curve=None, expect_failure=False,
-                       extra=""):
+                       extra=None):
     check_dpp_capab(dev[0], own_curve and "BP" in own_curve)
     hapd = hostapd.add_ap(apdev[0], {"ssid": "unconfigured"})
     check_dpp_capab(hapd)
@@ -2624,11 +2461,8 @@ def run_dpp_own_config(dev, apdev, own_curve=None, expect_failure=False,
         raise Exception("Failed to add configurator")
     conf_id = int(res)
 
-    id = dev[0].dpp_qr_code(uri)
-
-    cmd = "DPP_AUTH_INIT peer=%d conf=ap-dpp configurator=%d%s" % (id, conf_id, extra)
-    if "OK" not in dev[0].request(cmd):
-        raise Exception("Failed to initiate DPP Authentication")
+    dev[0].dpp_auth_init(uri=uri, conf="ap-dpp", configurator=conf_id,
+                         extra=extra)
     wait_auth_success(hapd, dev[0], configurator=dev[0], enrollee=hapd)
     update_hapd_config(hapd)
 
@@ -2674,7 +2508,7 @@ def test_dpp_own_config_ap_reconf(dev, apdev):
     finally:
         dev[0].set("dpp_config_processing", "0")
 
-def run_dpp_own_config_ap(dev, apdev, reconf_configurator=False, extra=""):
+def run_dpp_own_config_ap(dev, apdev, reconf_configurator=False, extra=None):
     check_dpp_capab(dev[0])
     hapd = hostapd.add_ap(apdev[0], {"ssid": "unconfigured"})
     check_dpp_capab(hapd)
@@ -2708,14 +2542,10 @@ def run_dpp_own_config_ap(dev, apdev, reconf_configurator=False, extra=""):
 
     id = dev[0].dpp_bootstrap_gen(chan="81/1", mac=True)
     uri = dev[0].request("DPP_BOOTSTRAP_GET_URI %d" % id)
-
-    id = hapd.dpp_qr_code(uri)
-
     dev[0].set("dpp_config_processing", "2")
     dev[0].dpp_listen(2412)
-    cmd = "DPP_AUTH_INIT peer=%d conf=sta-dpp configurator=%d%s" % (id, conf_id, extra)
-    if "OK" not in hapd.request(cmd):
-        raise Exception("Failed to initiate DPP Authentication")
+    hapd.dpp_auth_init(uri=uri, conf="sta-dpp", configurator=conf_id,
+                       extra=extra)
     wait_auth_success(dev[0], hapd, configurator=hapd, enrollee=dev[0])
     dev[0].wait_connected()
 
@@ -2751,74 +2581,40 @@ def run_dpp_intro_mismatch(dev, apdev, wpas):
         raise Exception("Failed to add configurator")
     conf_id = int(res)
 
-    id = dev[1].dpp_qr_code(uri)
-
     dev[1].set("dpp_groups_override", '[{"groupId":"a","netRole":"ap"}]')
-    cmd = "DPP_AUTH_INIT peer=%d conf=ap-dpp configurator=%d" % (id, conf_id)
-    if "OK" not in dev[1].request(cmd):
-        raise Exception("Failed to initiate DPP Authentication")
+    dev[1].dpp_auth_init(uri=uri, conf="ap-dpp", configurator=conf_id)
     update_hapd_config(hapd)
 
     logger.info("Provision STA0 with DPP Connector that has mismatching groupId")
     dev[0].set("dpp_config_processing", "2")
     id0 = dev[0].dpp_bootstrap_gen(chan="81/1", mac=True)
     uri0 = dev[0].request("DPP_BOOTSTRAP_GET_URI %d" % id0)
-
-    id1 = dev[1].dpp_qr_code(uri0)
-
     dev[0].dpp_listen(2412)
     dev[1].set("dpp_groups_override", '[{"groupId":"b","netRole":"sta"}]')
-    cmd = "DPP_AUTH_INIT peer=%d conf=sta-dpp configurator=%d" % (id1, conf_id)
-    if "OK" not in dev[1].request(cmd):
-        raise Exception("Failed to initiate DPP Authentication")
-    ev = dev[1].wait_event(["DPP-CONF-SENT"], timeout=5)
-    if ev is None:
-        raise Exception("DPP configuration not completed (Configurator for STA0)")
-    ev = dev[0].wait_event(["DPP-CONF-RECEIVED"], timeout=5)
-    if ev is None:
-        raise Exception("DPP configuration not completed (Enrollee STA0)")
+    dev[1].dpp_auth_init(uri=uri0, conf="sta-dpp", configurator=conf_id)
+    wait_auth_success(dev[0], dev[1], configurator=dev[1], enrollee=dev[0])
 
     logger.info("Provision STA2 with DPP Connector that has mismatching C-sign-key")
     dev[2].set("dpp_config_processing", "2")
     id2 = dev[2].dpp_bootstrap_gen(chan="81/1", mac=True)
     uri2 = dev[2].request("DPP_BOOTSTRAP_GET_URI %d" % id2)
-
-    id1 = dev[1].dpp_qr_code(uri2)
-
     dev[2].dpp_listen(2412)
     res = dev[1].request("DPP_CONFIGURATOR_ADD")
     if "FAIL" in res:
         raise Exception("Failed to add configurator")
     conf_id_2 = int(res)
     dev[1].set("dpp_groups_override", '')
-    cmd = "DPP_AUTH_INIT peer=%d conf=sta-dpp configurator=%d" % (id1, conf_id_2)
-    if "OK" not in dev[1].request(cmd):
-        raise Exception("Failed to initiate DPP Authentication")
-    ev = dev[1].wait_event(["DPP-CONF-SENT"], timeout=5)
-    if ev is None:
-        raise Exception("DPP configuration not completed (Configurator for STA2)")
-    ev = dev[2].wait_event(["DPP-CONF-RECEIVED"], timeout=5)
-    if ev is None:
-        raise Exception("DPP configuration not completed (Enrollee STA2)")
+    dev[1].dpp_auth_init(uri=uri2, conf="sta-dpp", configurator=conf_id_2)
+    wait_auth_success(dev[2], dev[1], configurator=dev[1], enrollee=dev[2])
 
     logger.info("Provision STA5 with DPP Connector that has mismatching netAccessKey EC group")
     wpas.set("dpp_config_processing", "2")
     id5 = wpas.dpp_bootstrap_gen(chan="81/1", mac=True, curve="P-521")
     uri5 = wpas.request("DPP_BOOTSTRAP_GET_URI %d" % id5)
-
-    id1 = dev[1].dpp_qr_code(uri5)
-
     wpas.dpp_listen(2412)
     dev[1].set("dpp_groups_override", '')
-    cmd = "DPP_AUTH_INIT peer=%d conf=sta-dpp configurator=%d" % (id1, conf_id)
-    if "OK" not in dev[1].request(cmd):
-        raise Exception("Failed to initiate DPP Authentication")
-    ev = dev[1].wait_event(["DPP-CONF-SENT"], timeout=5)
-    if ev is None:
-        raise Exception("DPP configuration not completed (Configurator for STA0)")
-    ev = wpas.wait_event(["DPP-CONF-RECEIVED"], timeout=5)
-    if ev is None:
-        raise Exception("DPP configuration not completed (Enrollee STA5)")
+    dev[1].dpp_auth_init(uri=uri5, conf="sta-dpp", configurator=conf_id)
+    wait_auth_success(wpas, dev[1], configurator=dev[1], enrollee=wpas)
 
     logger.info("Verify network introduction results")
     ev = dev[0].wait_event(["DPP-INTRO"], timeout=10)
@@ -2858,8 +2654,6 @@ def run_dpp_proto_init(dev, test_dev, test, mutual=False, unicast=True,
     id0 = dev[0].dpp_bootstrap_gen(chan=chan, mac=unicast)
     uri0 = dev[0].request("DPP_BOOTSTRAP_GET_URI %d" % id0)
 
-    id1 = dev[1].dpp_qr_code(uri0)
-
     if mutual:
         id1b = dev[1].dpp_bootstrap_gen(chan="81/1", mac=True)
         uri1b = dev[1].request("DPP_BOOTSTRAP_GET_URI %d" % id1b)
@@ -2884,16 +2678,22 @@ def run_dpp_proto_init(dev, test_dev, test, mutual=False, unicast=True,
     if listen:
         dev[0].dpp_listen(2412, qr=qr, role=role)
 
+    role = None
+    configurator = None
+    conf = None
+    own = None
+
     if init_enrollee:
-        cmd = "DPP_AUTH_INIT peer=%d role=enrollee" % (id1)
+        role="enrollee"
     else:
-        cmd = "DPP_AUTH_INIT peer=%d configurator=%d conf=sta-dpp" % (id1, conf_id)
+        configurator=conf_id
+        conf="sta-dpp"
         if incompatible_roles:
-            cmd += " role=enrollee"
+            role="enrollee"
     if mutual:
-        cmd += " own=%d" % id1b
-    if "OK" not in dev[1].request(cmd):
-        raise Exception("Failed to initiate DPP Authentication")
+        own = id1b
+    dev[1].dpp_auth_init(uri=uri0, role=role, configurator=configurator,
+                         conf=conf, own=own)
 
 def test_dpp_proto_after_wrapped_data_auth_req(dev, apdev):
     """DPP protocol testing - attribute after Wrapped Data in Auth Req"""
@@ -3674,15 +3474,9 @@ def run_dpp_qr_code_chan_list(dev, apdev, unicast, listen_freq, chanlist,
     logger.info("dev0 displays QR Code")
     id0 = dev[0].dpp_bootstrap_gen(chan=chanlist, mac=unicast)
     uri0 = dev[0].request("DPP_BOOTSTRAP_GET_URI %d" % id0)
-
-    logger.info("dev1 scans QR Code")
-    id1 = dev[1].dpp_qr_code(uri0)
-
-    logger.info("dev1 initiates DPP Authentication")
+    logger.info("dev1 scans QR Code and initiates DPP Authentication")
     dev[0].dpp_listen(listen_freq)
-    cmd = "DPP_AUTH_INIT peer=%d" % id1
-    if "OK" not in dev[1].request(cmd):
-        raise Exception("Failed to initiate DPP Authentication")
+    dev[1].dpp_auth_init(uri=uri0)
     if no_wait:
         return
     wait_auth_success(dev[0], dev[1], timeout=timeout, configurator=dev[1],
@@ -3693,15 +3487,9 @@ def test_dpp_qr_code_chan_list_no_match(dev, apdev):
     """DPP QR Code and no matching supported channel"""
     check_dpp_capab(dev[0])
     check_dpp_capab(dev[1])
-
     id0 = dev[0].dpp_bootstrap_gen(chan="123/123")
     uri0 = dev[0].request("DPP_BOOTSTRAP_GET_URI %d" % id0)
-
-    id1 = dev[1].dpp_qr_code(uri0)
-
-    cmd = "DPP_AUTH_INIT peer=%d" % id1
-    if "FAIL" not in dev[1].request(cmd):
-        raise Exception("DPP Authentication started unexpectedly")
+    dev[1].dpp_auth_init(uri=uri0, expect_fail=True)
 
 def test_dpp_pkex_alloc_fail(dev, apdev):
     """DPP/PKEX and memory allocation failures"""
@@ -3997,16 +3785,12 @@ def start_dpp(dev):
     id0 = dev[0].dpp_bootstrap_gen(chan="81/1", mac=True)
     uri0 = dev[0].request("DPP_BOOTSTRAP_GET_URI %d" % id0)
 
-    id1 = dev[1].dpp_qr_code(uri0)
-
     conf = '{"wi-fi_tech":"infra", "discovery":{"ssid":"test"},"cred":{"akm":"psk","pass":"secret passphrase"}}' + 3000*' '
     dev[0].set("dpp_config_obj_override", conf)
 
     dev[0].set("ext_mgmt_frame_handling", "1")
     dev[0].dpp_listen(2412)
-    cmd = "DPP_AUTH_INIT peer=%d role=enrollee" % id1
-    if "OK" not in dev[1].request(cmd):
-        raise Exception("Failed to initiate DPP Authentication")
+    dev[1].dpp_auth_init(uri=uri0, role="enrollee")
 
 def test_dpp_gas_timeout_handling(dev, apdev):
     """DPP and GAS timeout handling"""
@@ -4165,17 +3949,11 @@ def test_dpp_bootstrap_key_autogen_issues(dev, apdev):
     logger.info("dev1 initiates DPP Authentication")
     dev[0].dpp_listen(2412)
     with alloc_fail(dev[1], 1, "dpp_autogen_bootstrap_key"):
-        cmd = "DPP_AUTH_INIT peer=%d" % id1
-        if "FAIL" not in dev[1].request(cmd):
-            raise Exception("Failure not reported")
+        dev[1].dpp_auth_init(peer=id1, expect_fail=True)
     with alloc_fail(dev[1], 2, "=dpp_autogen_bootstrap_key"):
-        cmd = "DPP_AUTH_INIT peer=%d" % id1
-        if "FAIL" not in dev[1].request(cmd):
-            raise Exception("Failure not reported")
+        dev[1].dpp_auth_init(peer=id1, expect_fail=True)
     with fail_test(dev[1], 1, "dpp_keygen;dpp_autogen_bootstrap_key"):
-        cmd = "DPP_AUTH_INIT peer=%d" % id1
-        if "FAIL" not in dev[1].request(cmd):
-            raise Exception("Failure not reported")
+        dev[1].dpp_auth_init(peer=id1, expect_fail=True)
     dev[0].request("DPP_STOP_LISTEN")
 
 def test_dpp_auth_resp_status_failure(dev, apdev):
@@ -4188,20 +3966,13 @@ def test_dpp_auth_resp_aes_siv_issue(dev, apdev):
     """DPP Auth Resp AES-SIV issue"""
     check_dpp_capab(dev[0])
     check_dpp_capab(dev[1])
-
     logger.info("dev0 displays QR Code")
     id0 = dev[0].dpp_bootstrap_gen(chan="81/1", mac=True)
     uri0 = dev[0].request("DPP_BOOTSTRAP_GET_URI %d" % id0)
-
-    logger.info("dev1 scans QR Code")
-    id1 = dev[1].dpp_qr_code(uri0)
-
-    logger.info("dev1 initiates DPP Authentication")
+    logger.info("dev1 scans QR Code and initiates DPP Authentication")
     dev[0].dpp_listen(2412)
-    cmd = "DPP_AUTH_INIT peer=%d" % id1
     with fail_test(dev[1], 1, "aes_siv_decrypt;dpp_auth_resp_rx"):
-        if "OK" not in dev[1].request(cmd):
-            raise Exception("Failed to initiate DPP Authentication")
+        dev[1].dpp_auth_init(uri=uri0)
         wait_dpp_fail(dev[1], "AES-SIV decryption failed")
     dev[0].request("DPP_STOP_LISTEN")
 
@@ -4209,35 +3980,23 @@ def test_dpp_invalid_legacy_params(dev, apdev):
     """DPP invalid legacy parameters"""
     check_dpp_capab(dev[0])
     check_dpp_capab(dev[1])
-
     id0 = dev[0].dpp_bootstrap_gen(chan="81/1", mac=True)
     uri0 = dev[0].request("DPP_BOOTSTRAP_GET_URI %d" % id0)
-
-    id1 = dev[1].dpp_qr_code(uri0)
-
     # No pass/psk
-    cmd = "DPP_AUTH_INIT peer=%d conf=sta-psk ssid=%s" % (id1, binascii.hexlify(b"dpp-legacy").decode())
-    if "FAIL" not in dev[1].request(cmd):
-        raise Exception("Invalid command not rejected")
+    dev[1].dpp_auth_init(uri=uri0, conf="sta-psk", ssid="dpp-legacy",
+                         expect_fail=True)
 
 def test_dpp_invalid_legacy_params2(dev, apdev):
     """DPP invalid legacy parameters 2"""
     check_dpp_capab(dev[0])
     check_dpp_capab(dev[1])
-
     id0 = dev[0].dpp_bootstrap_gen(chan="81/1", mac=True)
     uri0 = dev[0].request("DPP_BOOTSTRAP_GET_URI %d" % id0)
-
-    id1 = dev[1].dpp_qr_code(uri0)
-
     dev[0].set("dpp_configurator_params",
                " conf=sta-psk ssid=%s" % (binascii.hexlify(b"dpp-legacy").decode()))
     dev[0].dpp_listen(2412, role="configurator")
-
+    dev[1].dpp_auth_init(uri=uri0, role="enrollee")
     # No pass/psk
-    cmd = "DPP_AUTH_INIT peer=%d role=enrollee" % id1
-    if "OK" not in dev[1].request(cmd):
-        raise Exception("Failed to initiate DPP Authentication")
     ev = dev[0].wait_event(["DPP: Failed to set configurator parameters"],
                            timeout=5)
     if ev is None:
@@ -4247,19 +4006,12 @@ def test_dpp_legacy_params_failure(dev, apdev):
     """DPP legacy parameters local failure"""
     check_dpp_capab(dev[0])
     check_dpp_capab(dev[1])
-
     id0 = dev[0].dpp_bootstrap_gen(chan="81/1", mac=True)
     uri0 = dev[0].request("DPP_BOOTSTRAP_GET_URI %d" % id0)
-
-    id1 = dev[1].dpp_qr_code(uri0)
-
     dev[0].dpp_listen(2412)
-    cmd = "DPP_AUTH_INIT peer=%d conf=sta-psk pass=%s ssid=%s" % (id1,
-        binascii.hexlify(b"passphrase").decode(),
-        binascii.hexlify(b"dpp-legacy").decode())
     with alloc_fail(dev[1], 1, "dpp_build_conf_obj_legacy"):
-        if "OK" not in dev[1].request(cmd):
-            raise Exception("Failed to initiate DPP")
+        dev[1].dpp_auth_init(uri=uri0, conf="sta-psk", passphrase="passphrase",
+                             ssid="dpp-legacy")
         ev = dev[0].wait_event(["DPP-CONF-FAILED"], timeout=5)
         if ev is None:
             raise Exception("DPP configuration failure not reported")
@@ -4532,24 +4284,14 @@ def test_dpp_listen_continue(dev, apdev):
     """DPP and continue listen state"""
     check_dpp_capab(dev[0])
     check_dpp_capab(dev[1])
-
     id = dev[0].dpp_bootstrap_gen(chan="81/1", mac=True)
     uri = dev[0].request("DPP_BOOTSTRAP_GET_URI %d" % id)
-
     dev[0].dpp_listen(2412)
     time.sleep(5.1)
-
-    id = dev[1].dpp_qr_code(uri)
-    if "OK" not in dev[1].request("DPP_AUTH_INIT peer=%d" % id):
-        raise Exception("Failed to initiate DPP Authentication")
-    ev = dev[0].wait_event(["DPP-CONF-FAILED"], timeout=2)
-    if ev is None:
-        raise Exception("DPP configuration result not seen (Enrollee)")
-    ev = dev[1].wait_event(["DPP-CONF-SENT"], timeout=2)
-    if ev is None:
-        raise Exception("DPP configuration result not seen (Responder)")
-    dev[0].request("DPP_STOP_LISTEN")
-    dev[1].request("DPP_STOP_LISTEN")
+    dev[1].dpp_auth_init(uri=uri)
+    wait_auth_success(dev[0], dev[1], configurator=dev[1], enrollee=dev[0],
+                      allow_enrollee_failure=True, stop_responder=True,
+                      stop_initiator=True)
 
 def test_dpp_network_addition_failure(dev, apdev):
     """DPP network addition failure"""
@@ -4600,22 +4342,14 @@ def test_dpp_two_initiators(dev, apdev):
     check_dpp_capab(dev[0])
     check_dpp_capab(dev[1])
     check_dpp_capab(dev[2])
-
     id = dev[0].dpp_bootstrap_gen(chan="81/1", mac=True)
     uri = dev[0].request("DPP_BOOTSTRAP_GET_URI %d" % id)
-
     dev[0].dpp_listen(2412)
-    id1 = dev[1].dpp_qr_code(uri)
-    id2 = dev[2].dpp_qr_code(uri)
-
-    if "OK" not in dev[1].request("DPP_AUTH_INIT peer=%d" % id1):
-        raise Exception("Failed to initiate DPP Authentication")
+    dev[1].dpp_auth_init(uri=uri)
     ev = dev[0].wait_event(["DPP-RX"], timeout=5)
     if ev is None:
         raise Exeption("No DPP Authentication Request seen")
-    if "OK" not in dev[2].request("DPP_AUTH_INIT peer=%d" % id2):
-        raise Exception("Failed to initiate DPP Authentication (2)")
-
+    dev[2].dpp_auth_init(uri=uri)
     wait_dpp_fail(dev[0],
                   "DPP-FAIL Already in DPP authentication exchange - ignore new one")
 
@@ -4659,18 +4393,12 @@ def test_dpp_duplicated_auth_resp(dev, apdev):
     """DPP and duplicated Authentication Response"""
     check_dpp_capab(dev[0])
     check_dpp_capab(dev[1])
-
     id0 = dev[0].dpp_bootstrap_gen(chan="81/1", mac=True)
     uri0 = dev[0].request("DPP_BOOTSTRAP_GET_URI %d" % id0)
-    id1 = dev[1].dpp_qr_code(uri0)
-
     dev[0].set("ext_mgmt_frame_handling", "1")
     dev[1].set("ext_mgmt_frame_handling", "1")
-
     dev[0].dpp_listen(2412)
-    cmd = "DPP_AUTH_INIT peer=%d" % id1
-    if "OK" not in dev[1].request(cmd):
-        raise Exception("Failed to initiate DPP Authentication")
+    dev[1].dpp_auth_init(uri=uri0)
 
     # DPP Authentication Request
     rx_process_frame(dev[0])
@@ -4709,23 +4437,14 @@ def test_dpp_enrollee_reject_config(dev, apdev):
     check_dpp_capab(dev[0])
     check_dpp_capab(dev[1])
     dev[0].set("dpp_test", "91")
-
     id0 = dev[0].dpp_bootstrap_gen(chan="81/1", mac=True)
     uri0 = dev[0].request("DPP_BOOTSTRAP_GET_URI %d" % id0)
-    id1 = dev[1].dpp_qr_code(uri0)
-
     dev[0].dpp_listen(2412)
-    cmd = "DPP_AUTH_INIT peer=%d conf=sta-sae ssid=%s pass=%s" % (id1,
-        binascii.hexlify(b"dpp-legacy").decode(),
-        binascii.hexlify(b"secret passphrase").decode())
-    if "OK" not in dev[1].request(cmd):
-        raise Exception("Failed to initiate DPP Authentication")
-    ev = dev[1].wait_event(["DPP-CONF-FAILED"], timeout=10)
-    if ev is None:
-        raise Exception("DPP configuration failure not reported by Configurator")
-    ev = dev[0].wait_event(["DPP-CONF-FAILED"], timeout=2)
-    if ev is None:
-        raise Exception("Forced DPP configuration failure not reported by Enrollee")
+    dev[1].dpp_auth_init(uri=uri0, conf="sta-sae", ssid="dpp-legacy",
+                         passphrase="secret passphrase")
+    wait_auth_success(dev[0], dev[1], configurator=dev[1], enrollee=dev[0],
+                      allow_enrollee_failure=True,
+                      allow_configurator_failure=True)
 
 def test_dpp_enrollee_ap_reject_config(dev, apdev):
     """DPP and Enrollee AP rejecting Config Object"""
@@ -4743,17 +4462,10 @@ def test_dpp_enrollee_ap_reject_config(dev, apdev):
 
     id_h = hapd.dpp_bootstrap_gen(chan="81/1", mac=True)
     uri = hapd.request("DPP_BOOTSTRAP_GET_URI %d" % id_h)
-    id = dev[0].dpp_qr_code(uri)
-
-    cmd = "DPP_AUTH_INIT peer=%d conf=ap-dpp configurator=%d" % (id, conf_id)
-    if "OK" not in dev[0].request(cmd):
-        raise Exception("Failed to initiate DPP Authentication")
-    ev = dev[0].wait_event(["DPP-CONF-FAILED"], timeout=10)
-    if ev is None:
-        raise Exception("DPP configuration failure not reported by Configurator")
-    ev = hapd.wait_event(["DPP-CONF-FAILED"], timeout=2)
-    if ev is None:
-        raise Exception("Forced DPP configuration failure not reported by Enrollee")
+    dev[0].dpp_auth_init(uri=uri, conf="ap-dpp", configurator=conf_id)
+    wait_auth_success(hapd, dev[0], configurator=dev[0], enrollee=hapd,
+                      allow_enrollee_failure=True,
+                      allow_configurator_failure=True)
 
 def test_dpp_legacy_and_dpp_akm(dev, apdev):
     """DPP and provisoning DPP and legacy AKMs"""
@@ -4797,21 +4509,12 @@ def run_dpp_legacy_and_dpp_akm(dev, apdev):
     dev[0].set("dpp_config_processing", "1")
     id0 = dev[0].dpp_bootstrap_gen(chan="81/1", mac=True)
     uri0 = dev[0].request("DPP_BOOTSTRAP_GET_URI %d" % id0)
-
-    id1 = dev[1].dpp_qr_code(uri0)
-
     dev[0].dpp_listen(2412)
-    cmd = "DPP_AUTH_INIT peer=%d conf=sta-psk-sae-dpp ssid=%s pass=%s configurator=%d" % \
-          (id1, binascii.hexlify(ssid.encode()).decode(),
-           binascii.hexlify(passphrase.encode()).decode(), conf_id)
-    if "OK" not in dev[1].request(cmd):
-        raise Exception("Failed to initiate DPP Authentication")
-    ev = dev[1].wait_event(["DPP-CONF-SENT", "DPP-CONF-FAILED"], timeout=10)
-    if ev is None or "DPP-CONF-SENT" not in ev:
-        raise Exception("DPP configuration not completed (Configurator)")
-    ev = dev[0].wait_event(["DPP-CONF-RECEIVED", "DPP-CONF-FAILED"], timeout=2)
-    if ev is None or "DPP-CONF-FAILED" in ev:
-        raise Exception("DPP configuration not completed (Enrollee)")
+    dev[1].dpp_auth_init(uri=uri0, conf="sta-psk-sae-dpp", ssid=ssid,
+                         passphrase=passphrase, configurator=conf_id)
+    wait_auth_success(dev[0], dev[1], configurator=dev[1], enrollee=dev[0],
+                      allow_enrollee_failure=True,
+                      allow_configurator_failure=True)
     ev = dev[0].wait_event(["DPP-NETWORK-ID"], timeout=1)
     if ev is None:
         raise Exception("DPP network profile not generated")
index 96d2d05329476adb546e8030230b5978ee3e6ca8..a6c15a1410bac96e8aee812aa1501c46de6b21ad 100644 (file)
@@ -1437,6 +1437,37 @@ class WpaSupplicant:
         if "OK" not in self.request(cmd):
             raise Exception("Failed to start listen operation")
 
+    def dpp_auth_init(self, peer=None, uri=None, conf=None, configurator=None,
+                      extra=None, own=None, role=None, neg_freq=None,
+                      ssid=None, passphrase=None, expect_fail=False):
+        cmd = "DPP_AUTH_INIT"
+        if peer is None:
+            peer = self.dpp_qr_code(uri)
+        cmd += " peer=%d" % peer
+        if own is not None:
+            cmd += " own=%d" % own
+        if role:
+            cmd += " role=" + role
+        if extra:
+            cmd += " " + extra
+        if conf:
+            cmd += " conf=" + conf
+        if configurator is not None:
+            cmd += " configurator=%d" % configurator
+        if neg_freq:
+            cmd += " neg_freq=%d" % neg_freq
+        if ssid:
+            cmd += " ssid=" + binascii.hexlify(ssid.encode()).decode()
+        if passphrase:
+            cmd += " pass=" + binascii.hexlify(passphrase.encode()).decode()
+        res = self.request(cmd)
+        if expect_fail:
+            if "FAIL" not in res:
+                raise Exception("DPP authentication started unexpectedly")
+            return
+        if "OK" not in res:
+            raise Exception("Failed to initiate DPP Authentication")
+
     def dpp_pkex_init(self, identifier, code, role=None, key=None, curve=None,
                       extra=None, use_id=None, allow_fail=False):
         if use_id is None: