]> git.ipfire.org Git - thirdparty/hostap.git/commitdiff
tests: DPP use the wait_auth_success() helper function
authorJouni Malinen <j@w1.fi>
Sun, 17 Mar 2019 16:37:56 +0000 (18:37 +0200)
committerJouni Malinen <j@w1.fi>
Sun, 17 Mar 2019 17:17:31 +0000 (19:17 +0200)
Use the already existing helper function and extend it to cover the most
common test sequences.

Signed-off-by: Jouni Malinen <j@w1.fi>
tests/hwsim/test_dpp.py

index e845fa4a0d8f0f1b264647d678ce1cb5e0f65776..19272af76ea260df3e14b3f39761055ee824a887 100644 (file)
@@ -206,22 +206,9 @@ def test_dpp_qr_code_curve_select(dev, apdev):
         res = dev[1].dpp_qr_code(uri)
         if "OK" not in dev[1].request("DPP_AUTH_INIT peer=%d" % res):
             raise Exception("Failed to initiate DPP Authentication")
-        ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-        if ev is None:
-            raise Exception("DPP authentication did not succeed (Responder)")
-        ev = dev[1].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-        if ev is None:
-            raise Exception("DPP authentication did not succeed (Initiator)")
-        ev = dev[0].wait_event(["DPP-CONF-FAILED"], timeout=2)
-        if ev is None:
-            raise Exception("DPP configuration result not seen (Enrollee)")
-        ev = dev[1].wait_event(["DPP-CONF-SENT"], timeout=2)
-        if ev is None:
-            raise Exception("DPP configuration result not seen (Responder)")
-        dev[0].request("DPP_STOP_LISTEN")
-        dev[1].request("DPP_STOP_LISTEN")
-        dev[0].dump_monitor()
-        dev[1].dump_monitor()
+        wait_auth_success(dev[0], dev[1], configurator=dev[1], enrollee=dev[0],
+                          allow_enrollee_failure=True, stop_responder=True,
+                          stop_initiator=True)
 
 def test_dpp_qr_code_auth_broadcast(dev, apdev):
     """DPP QR Code and authentication exchange (broadcast)"""
@@ -238,13 +225,7 @@ def test_dpp_qr_code_auth_broadcast(dev, apdev):
     dev[0].dpp_listen(2412)
     if "OK" not in dev[1].request("DPP_AUTH_INIT peer=%d" % id1):
         raise Exception("Failed to initiate DPP Authentication")
-    ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Responder)")
-    ev = dev[1].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Initiator)")
-    dev[0].request("DPP_STOP_LISTEN")
+    wait_auth_success(dev[0], dev[1], stop_responder=True)
 
 def test_dpp_qr_code_auth_unicast(dev, apdev):
     """DPP QR Code and authentication exchange (unicast)"""
@@ -314,28 +295,11 @@ def run_dpp_qr_code_auth_unicast(dev, apdev, curve, netrole=None, key=None,
         cmd += " configurator=%d" % conf_id
     if "OK" not in dev[1].request(cmd):
         raise Exception("Failed to initiate DPP Authentication")
-    ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Responder)")
-    ev = dev[1].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Initiator)")
-    ev = dev[1].wait_event(["DPP-CONF-SENT", "DPP-CONF-FAILED"], timeout=5)
-    if ev is None:
-        raise Exception("DPP configuration not completed (Configurator)")
-    if "DPP-CONF-FAILED" in ev and not require_conf_failure:
-        raise Exception("Unexpected failure on Configurator")
-    ev = dev[0].wait_event(["DPP-CONF-RECEIVED", "DPP-CONF-FAILED"], timeout=5)
-    if ev is None:
-        raise Exception("DPP configuration not completed (Enrollee)")
-    if require_conf_success:
-        if "DPP-CONF-FAILED" in ev:
-            raise Exception("DPP configuration failed")
-    if require_conf_failure:
-        if "DPP-CONF-SUCCESS" in ev:
-            raise Exception("DPP configuration succeeded unexpectedly")
-    dev[0].request("DPP_STOP_LISTEN")
-    dev[0].dump_monitor()
+    wait_auth_success(dev[0], dev[1], configurator=dev[1], enrollee=dev[0],
+                      allow_enrollee_failure=True,
+                      allow_configurator_failure=not require_conf_success,
+                      require_configurator_failure=require_conf_failure,
+                      stop_responder=True)
     dev[1].dump_monitor()
 
 def test_dpp_qr_code_auth_mutual(dev, apdev):
@@ -367,13 +331,7 @@ def test_dpp_qr_code_auth_mutual(dev, apdev):
     if "mutual=1" not in ev:
         raise Exception("Mutual authentication not used")
 
-    ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Responder)")
-    ev = dev[1].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Initiator)")
-    dev[0].request("DPP_STOP_LISTEN")
+    wait_auth_success(dev[0], dev[1], stop_responder=True)
 
 def test_dpp_qr_code_auth_mutual2(dev, apdev):
     """DPP QR Code and authentication exchange (mutual2)"""
@@ -411,13 +369,7 @@ def test_dpp_qr_code_auth_mutual2(dev, apdev):
     if "mutual=1" not in ev:
         raise Exception("Mutual authentication not used")
 
-    ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Responder)")
-    ev = dev[1].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Initiator)")
-    dev[0].request("DPP_STOP_LISTEN")
+    wait_auth_success(dev[0], dev[1], stop_responder=True)
 
 def test_dpp_qr_code_auth_mutual_p_256(dev, apdev):
     """DPP QR Code and authentication exchange (mutual, autogen P-256)"""
@@ -476,13 +428,7 @@ def run_dpp_qr_code_auth_mutual(dev, apdev, curve):
     if "mutual=1" not in ev:
         raise Exception("Mutual authentication not used")
 
-    ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Responder)")
-    ev = dev[1].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Initiator)")
-    dev[0].request("DPP_STOP_LISTEN")
+    wait_auth_success(dev[0], dev[1], stop_responder=True)
 
 def test_dpp_auth_resp_retries(dev, apdev):
     """DPP Authentication Response retries"""
@@ -565,13 +511,7 @@ def test_dpp_qr_code_auth_mutual_not_used(dev, apdev):
     if "mutual=0" not in ev:
         raise Exception("Mutual authentication not used")
 
-    ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Responder)")
-    ev = dev[1].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Initiator)")
-    dev[0].request("DPP_STOP_LISTEN")
+    wait_auth_success(dev[0], dev[1], stop_responder=True)
 
 def test_dpp_qr_code_auth_mutual_curve_mismatch(dev, apdev):
     """DPP QR Code and authentication exchange (mutual/mismatch)"""
@@ -627,13 +567,7 @@ def test_dpp_qr_code_auth_hostapd_mutual2(dev, apdev):
     logger.info("AP scans QR Code")
     hapd.dpp_qr_code(uri0)
 
-    ev = hapd.wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Responder)")
-    ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Initiator)")
-    hapd.request("DPP_STOP_LISTEN")
+    wait_auth_success(hapd, dev[0], stop_responder=True)
 
 def test_dpp_qr_code_listen_continue(dev, apdev):
     """DPP QR Code and listen operation needing continuation"""
@@ -652,13 +586,7 @@ def test_dpp_qr_code_listen_continue(dev, apdev):
     logger.info("dev1 initiates DPP Authentication")
     if "OK" not in dev[1].request("DPP_AUTH_INIT peer=%d" % id1):
         raise Exception("Failed to initiate DPP Authentication")
-    ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Responder)")
-    ev = dev[1].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Initiator)")
-    dev[0].request("DPP_STOP_LISTEN")
+    wait_auth_success(dev[0], dev[1], stop_responder=True)
 
 def test_dpp_qr_code_auth_initiator_enrollee(dev, apdev):
     """DPP QR Code and authentication exchange (Initiator in Enrollee role)"""
@@ -684,21 +612,8 @@ def run_dpp_qr_code_auth_initiator_enrollee(dev, apdev):
     dev[0].dpp_listen(2412)
     if "OK" not in dev[1].request("DPP_AUTH_INIT peer=%d role=enrollee" % id1):
         raise Exception("Failed to initiate DPP Authentication")
-    ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Responder)")
-    ev = dev[1].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Initiator)")
-
-    ev = dev[0].wait_event(["DPP-CONF-SENT"], timeout=5)
-    if ev is None:
-        raise Exception("DPP configuration did not succeed (Configurator)")
-    ev = dev[1].wait_event(["DPP-CONF-FAILED"], timeout=5)
-    if ev is None:
-        raise Exception("DPP configuration did not succeed (Enrollee)")
-
-    dev[0].request("DPP_STOP_LISTEN")
+    wait_auth_success(dev[0], dev[1], configurator=dev[0], enrollee=dev[1],
+                      allow_enrollee_failure=True, stop_responder=True)
 
 def test_dpp_qr_code_auth_initiator_either_1(dev, apdev):
     """DPP QR Code and authentication exchange (Initiator in either role)"""
@@ -729,21 +644,9 @@ def run_dpp_qr_code_auth_initiator_either(dev, apdev, resp_role,
     dev[0].dpp_listen(2412, role=resp_role)
     if "OK" not in dev[1].request("DPP_AUTH_INIT peer=%d role=either" % id1):
         raise Exception("Failed to initiate DPP Authentication")
-    ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Responder)")
-    ev = dev[1].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Initiator)")
-
-    ev = conf_dev.wait_event(["DPP-CONF-SENT"], timeout=5)
-    if ev is None:
-        raise Exception("DPP configuration did not succeed (Configurator)")
-    ev = enrollee_dev.wait_event(["DPP-CONF-FAILED"], timeout=5)
-    if ev is None:
-        raise Exception("DPP configuration did not succeed (Enrollee)")
-
-    dev[0].request("DPP_STOP_LISTEN")
+    wait_auth_success(dev[0], dev[1], configurator=conf_dev,
+                      enrollee=enrollee_dev, allow_enrollee_failure=True,
+                      stop_responder=True)
 
 def run_init_incompatible_roles(dev, role="enrollee"):
     check_dpp_capab(dev[0])
@@ -773,13 +676,7 @@ def test_dpp_qr_code_auth_incompatible_roles(dev, apdev):
 
     if "OK" not in dev[1].request("DPP_AUTH_INIT peer=%d role=configurator" % id1):
         raise Exception("Failed to initiate DPP Authentication")
-    ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Responder)")
-    ev = dev[1].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Initiator)")
-    dev[0].request("DPP_STOP_LISTEN")
+    wait_auth_success(dev[0], dev[1], stop_responder=True)
 
 def test_dpp_qr_code_auth_incompatible_roles2(dev, apdev):
     """DPP QR Code and authentication exchange (incompatible roles 2)"""
@@ -900,22 +797,8 @@ def test_dpp_qr_code_auth_neg_chan(dev, apdev):
     if "freq=2462 result=SUCCESS" not in ev:
         raise Exception("Unexpected TX status for Authentication Confirm: " + ev)
 
-    ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Responder)")
-    ev = dev[1].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Initiator)")
-    ev = dev[1].wait_event(["DPP-CONF-SENT"], timeout=5)
-    if ev is None:
-        raise Exception("DPP configuration not completed (Configurator)")
-    ev = dev[0].wait_event(["DPP-CONF-RECEIVED", "DPP-CONF-FAILED"], timeout=5)
-    if ev is None:
-        raise Exception("DPP configuration not completed (Enrollee)")
-    if "DPP-CONF-FAILED" in ev:
-        raise Exception("DPP configuration failed")
-    dev[0].request("DPP_STOP_LISTEN")
-    dev[0].dump_monitor()
+    wait_auth_success(dev[0], dev[1], configurator=dev[1], enrollee=dev[0],
+                      stop_responder=True)
     dev[1].dump_monitor()
 
 def test_dpp_config_legacy(dev, apdev):
@@ -1532,12 +1415,7 @@ def test_dpp_gas_timeout(dev, apdev):
         msg['freq'], msg['datarate'], msg['ssi_signal'], binascii.hexlify(msg['frame']).decode())):
         raise Exception("MGMT_RX_PROCESS failed")
 
-    ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Responder)")
-    ev = dev[1].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Initiator)")
+    wait_auth_success(dev[0], dev[1])
 
     # DPP Configuration Response (GAS Initial Response frame)
     msg = dev[0].mgmt_rx()
@@ -1803,21 +1681,7 @@ def run_dpp_ap_config(dev, apdev, curve=None, conf_curve=None,
     cmd = "DPP_AUTH_INIT peer=%d conf=ap-dpp configurator=%d" % (id, conf_id)
     if "OK" not in dev[0].request(cmd):
         raise Exception("Failed to initiate DPP Authentication")
-    ev = hapd.wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Responder)")
-    ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Initiator)")
-    ev = dev[0].wait_event(["DPP-CONF-SENT"], timeout=5)
-    if ev is None:
-        raise Exception("DPP configuration not completed (Configurator)")
-    ev = hapd.wait_event(["DPP-CONF-RECEIVED", "DPP-CONF-FAILED"], timeout=5)
-    if ev is None:
-        raise Exception("DPP configuration not completed (Enrollee)")
-    if "DPP-CONF-FAILED" in ev:
-        raise Exception("DPP configuration failed")
-
+    wait_auth_success(hapd, dev[0], configurator=dev[0], enrollee=hapd)
     update_hapd_config(hapd)
 
     id1 = dev[1].dpp_bootstrap_gen(chan="81/1", mac=True, curve=curve)
@@ -1842,19 +1706,8 @@ def run_dpp_ap_config(dev, apdev, curve=None, conf_curve=None,
     cmd = "DPP_AUTH_INIT peer=%d conf=sta-dpp configurator=%d" % (id0b, conf_id)
     if "OK" not in dev[0].request(cmd):
         raise Exception("Failed to initiate DPP Authentication")
-    ev = dev[1].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Responder)")
-    ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Initiator)")
-    ev = dev[0].wait_event(["DPP-CONF-SENT"], timeout=5)
-    if ev is None:
-        raise Exception("DPP configuration not completed (Configurator)")
-    ev = dev[1].wait_event(["DPP-CONF-RECEIVED"], timeout=5)
-    if ev is None:
-        raise Exception("DPP configuration not completed (Enrollee)")
-    dev[1].request("DPP_STOP_LISTEN")
+    wait_auth_success(dev[1], dev[0], configurator=dev[0], enrollee=dev[1],
+                      stop_responder=True)
 
     ev = dev[1].wait_event(["DPP-CONFOBJ-SSID"], timeout=1)
     if ev is None:
@@ -2129,20 +1982,8 @@ def run_dpp_qr_code_auth_responder_configurator(dev, apdev, extra):
     cmd = "DPP_AUTH_INIT peer=%d role=enrollee" % id1
     if "OK" not in dev[1].request(cmd):
         raise Exception("Failed to initiate DPP Authentication")
-    ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Responder)")
-    ev = dev[1].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Initiator)")
-    ev = dev[0].wait_event(["DPP-CONF-SENT"], timeout=5)
-    if ev is None:
-        raise Exception("DPP configuration not completed (Configurator)")
-    ev = dev[1].wait_event(["DPP-CONF-RECEIVED"], timeout=5)
-    if ev is None:
-        raise Exception("DPP configuration not completed (Enrollee)")
-    dev[0].request("DPP_STOP_LISTEN")
-    dev[0].dump_monitor()
+    wait_auth_success(dev[0], dev[1], configurator=dev[0], enrollee=dev[1],
+                      stop_responder=True)
     dev[1].dump_monitor()
 
 def test_dpp_qr_code_hostapd_init(dev, apdev):
@@ -2169,20 +2010,8 @@ def test_dpp_qr_code_hostapd_init(dev, apdev):
     cmd = "DPP_AUTH_INIT peer=%d role=enrollee" % id1
     if "OK" not in hapd.request(cmd):
         raise Exception("Failed to initiate DPP Authentication")
-    ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Responder)")
-    ev = hapd.wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Initiator)")
-    ev = dev[0].wait_event(["DPP-CONF-SENT"], timeout=5)
-    if ev is None:
-        raise Exception("DPP configuration not completed (Configurator)")
-    ev = hapd.wait_event(["DPP-CONF-RECEIVED"], timeout=5)
-    if ev is None:
-        raise Exception("DPP configuration not completed (Enrollee)")
-    dev[0].request("DPP_STOP_LISTEN")
-    dev[0].dump_monitor()
+    wait_auth_success(dev[0], hapd, configurator=dev[0], enrollee=hapd,
+                      stop_responder=True)
 
 def test_dpp_qr_code_hostapd_init_offchannel(dev, apdev):
     """DPP QR Code and hostapd as initiator (offchannel)"""
@@ -2217,20 +2046,8 @@ def run_dpp_qr_code_hostapd_init_offchannel(dev, apdev, extra):
         cmd += " " + extra
     if "OK" not in hapd.request(cmd):
         raise Exception("Failed to initiate DPP Authentication")
-    ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Responder)")
-    ev = hapd.wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Initiator)")
-    ev = dev[0].wait_event(["DPP-CONF-SENT"], timeout=5)
-    if ev is None:
-        raise Exception("DPP configuration not completed (Configurator)")
-    ev = hapd.wait_event(["DPP-CONF-RECEIVED"], timeout=5)
-    if ev is None:
-        raise Exception("DPP configuration not completed (Enrollee)")
-    dev[0].request("DPP_STOP_LISTEN")
-    dev[0].dump_monitor()
+    wait_auth_success(dev[0], hapd, configurator=dev[0], enrollee=hapd,
+                      stop_responder=True)
 
 def test_dpp_test_vector_p_256(dev, apdev):
     """DPP P-256 test vector (mutual auth)"""
@@ -2268,13 +2085,7 @@ def test_dpp_test_vector_p_256(dev, apdev):
     cmd = "DPP_AUTH_INIT peer=%d own=%d neg_freq=2412" % (id1peer, id1)
     if "OK" not in dev[1].request(cmd):
         raise Exception("Failed to initiate operation")
-
-    ev = dev[1].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Initiator)")
-    ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Responder)")
+    wait_auth_success(dev[0], dev[1])
 
 def test_dpp_test_vector_p_256_b(dev, apdev):
     """DPP P-256 test vector (Responder-only auth)"""
@@ -2311,13 +2122,7 @@ def test_dpp_test_vector_p_256_b(dev, apdev):
     cmd = "DPP_AUTH_INIT peer=%d own=%d neg_freq=2412" % (id1peer, id1)
     if "OK" not in dev[1].request(cmd):
         raise Exception("Failed to initiate operation")
-
-    ev = dev[1].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Initiator)")
-    ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Responder)")
+    wait_auth_success(dev[0], dev[1])
 
 def der_priv_key_p_521(priv):
     if len(priv) != 2 * 66:
@@ -2363,13 +2168,7 @@ def test_dpp_test_vector_p_521(dev, apdev):
     cmd = "DPP_AUTH_INIT peer=%d own=%d neg_freq=2412" % (id1peer, id1)
     if "OK" not in dev[1].request(cmd):
         raise Exception("Failed to initiate operation")
-
-    ev = dev[1].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Initiator)")
-    ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Responder)")
+    wait_auth_success(dev[0], dev[1])
 
 def test_dpp_pkex(dev, apdev):
     """DPP and PKEX"""
@@ -2467,20 +2266,9 @@ def run_dpp_pkex(dev, apdev, curve=None, init_extra="", check_config=False,
             raise Exception("DPP authentication succeeded")
         return
 
-    ev = dev[1].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Initiator)")
-    ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Responder)")
-
-    if check_config:
-        ev = dev[1].wait_event(["DPP-CONF-SENT"], timeout=5)
-        if ev is None:
-            raise Exception("DPP configuration not completed (Configurator)")
-        ev = dev[0].wait_event(["DPP-CONF-RECEIVED"], timeout=5)
-        if ev is None:
-            raise Exception("DPP configuration not completed (Enrollee)")
+    wait_auth_success(dev[0], dev[1],
+                      configurator=dev[1] if check_config else None,
+                      enrollee=dev[0] if check_config else None)
 
 def test_dpp_pkex_5ghz(dev, apdev):
     """DPP and PKEX on 5 GHz"""
@@ -2516,12 +2304,7 @@ def run_dpp_pkex_5ghz(dev, apdev):
     if "FAIL" in res:
         raise Exception("Failed to set PKEX data (initiator)")
 
-    ev = dev[1].wait_event(["DPP-AUTH-SUCCESS", "DPP-FAIL"], timeout=20)
-    if ev is None or "DPP-AUTH-SUCCESS" not in ev:
-        raise Exception("DPP authentication did not succeed (Initiator)")
-    ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Responder)")
+    wait_auth_success(dev[0], dev[1], timeout=20)
 
 def test_dpp_pkex_test_vector(dev, apdev):
     """DPP and PKEX (P-256) test vector"""
@@ -2580,13 +2363,7 @@ def test_dpp_pkex_test_vector(dev, apdev):
     res = dev[1].request(cmd)
     if "FAIL" in res:
         raise Exception("Failed to set PKEX data (initiator)")
-
-    ev = dev[1].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Initiator)")
-    ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Responder)")
+    wait_auth_success(dev[0], dev[1])
 
 def test_dpp_pkex_code_mismatch(dev, apdev):
     """DPP and PKEX with mismatching code"""
@@ -2620,13 +2397,7 @@ def test_dpp_pkex_code_mismatch(dev, apdev):
     res = dev[1].request(cmd)
     if "FAIL" in res:
         raise Exception("Failed to set PKEX data (initiator, retry)")
-
-    ev = dev[1].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Initiator, retry)")
-    ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Responder, retry)")
+    wait_auth_success(dev[0], dev[1])
 
 def test_dpp_pkex_code_mismatch_limit(dev, apdev):
     """DPP and PKEX with mismatching code limit"""
@@ -2799,20 +2570,7 @@ def run_dpp_pkex2(dev, apdev, curve=None, init_extra=""):
     res = dev[1].request(cmd)
     if "FAIL" in res:
         raise Exception("Failed to set PKEX data (initiator)")
-
-    ev = dev[1].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Initiator)")
-    ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Responder)")
-
-    ev = dev[0].wait_event(["DPP-CONF-SENT"], timeout=5)
-    if ev is None:
-        raise Exception("DPP configuration not completed (Configurator)")
-    ev = dev[1].wait_event(["DPP-CONF-RECEIVED"], timeout=5)
-    if ev is None:
-        raise Exception("DPP configuration not completed (Enrollee)")
+    wait_auth_success(dev[0], dev[1], configurator=dev[0], enrollee=dev[1])
 
 def test_dpp_pkex_no_responder(dev, apdev):
     """DPP and PKEX with no responder (retry behavior)"""
@@ -2854,17 +2612,8 @@ def test_dpp_pkex_after_retry(dev, apdev):
     if "FAIL" in res:
         raise Exception("Failed to set PKEX data (responder)")
     dev[1].dpp_listen(2437)
-
-    ev = dev[1].wait_event(["DPP-AUTH-SUCCESS"], timeout=10)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Responder)")
-    ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Initiator)")
-    ev = dev[0].wait_event(["DPP-CONF-SENT"], timeout=5)
-    if ev is None:
-        raise Exception("DPP configuration not completed (Configurator)")
-    # Ignore Enrollee result since configurator was not set here
+    wait_auth_success(dev[1], dev[0], configurator=dev[0], enrollee=dev[1],
+                      allow_enrollee_failure=True)
 
 def test_dpp_pkex_hostapd_responder(dev, apdev):
     """DPP PKEX with hostapd as responder"""
@@ -2892,21 +2641,8 @@ def test_dpp_pkex_hostapd_responder(dev, apdev):
     res = dev[0].request(cmd)
     if "FAIL" in res:
         raise Exception("Failed to set PKEX data (initiator/wpa_supplicant)")
-
-    ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Initiator)")
-    ev = hapd.wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Responder)")
-    ev = dev[0].wait_event(["DPP-CONF-SENT"], timeout=5)
-    if ev is None:
-        raise Exception("DPP configuration not completed (Configurator)")
-    ev = hapd.wait_event(["DPP-CONF-RECEIVED"], timeout=5)
-    if ev is None:
-        raise Exception("DPP configuration not completed (Enrollee)")
-    dev[0].request("DPP_STOP_LISTEN")
-    dev[0].dump_monitor()
+    wait_auth_success(hapd, dev[0], configurator=dev[0], enrollee=hapd,
+                      stop_initiator=True)
 
 def test_dpp_pkex_hostapd_initiator(dev, apdev):
     """DPP PKEX with hostapd as initiator"""
@@ -2938,21 +2674,8 @@ def test_dpp_pkex_hostapd_initiator(dev, apdev):
     res = hapd.request(cmd)
     if "FAIL" in res:
         raise Exception("Failed to set PKEX data (initiator/hostapd)")
-
-    ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Initiator)")
-    ev = hapd.wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Responder)")
-    ev = dev[0].wait_event(["DPP-CONF-SENT"], timeout=5)
-    if ev is None:
-        raise Exception("DPP configuration not completed (Configurator)")
-    ev = hapd.wait_event(["DPP-CONF-RECEIVED"], timeout=5)
-    if ev is None:
-        raise Exception("DPP configuration not completed (Enrollee)")
-    dev[0].request("DPP_STOP_LISTEN")
-    dev[0].dump_monitor()
+    wait_auth_success(hapd, dev[0], configurator=dev[0], enrollee=hapd,
+                      stop_initiator=True)
 
 def test_dpp_hostapd_configurator(dev, apdev):
     """DPP with hostapd as configurator/initiator"""
@@ -2984,21 +2707,8 @@ def test_dpp_hostapd_configurator(dev, apdev):
     cmd = "DPP_AUTH_INIT peer=%d configurator=%d conf=sta-dpp" % (id1, conf_id)
     if "OK" not in hapd.request(cmd):
         raise Exception("Failed to initiate DPP Authentication")
-
-    ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Responder)")
-    ev = hapd.wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Initiator)")
-    ev = hapd.wait_event(["DPP-CONF-SENT"], timeout=5)
-    if ev is None:
-        raise Exception("DPP configuration not completed (Configurator)")
-    ev = dev[0].wait_event(["DPP-CONF-RECEIVED"], timeout=5)
-    if ev is None:
-        raise Exception("DPP configuration not completed (Enrollee)")
-    dev[0].request("DPP_STOP_LISTEN")
-    dev[0].dump_monitor()
+    wait_auth_success(dev[0], hapd, configurator=hapd, enrollee=dev[0],
+                      stop_responder=True)
 
 def test_dpp_hostapd_configurator_responder(dev, apdev):
     """DPP with hostapd as configurator/responder"""
@@ -3024,21 +2734,8 @@ def test_dpp_hostapd_configurator_responder(dev, apdev):
     cmd = "DPP_AUTH_INIT peer=%d role=enrollee" % (id1)
     if "OK" not in dev[0].request(cmd):
         raise Exception("Failed to initiate DPP Authentication")
-
-    ev = hapd.wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Responder)")
-    ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Initiator)")
-    ev = hapd.wait_event(["DPP-CONF-SENT"], timeout=5)
-    if ev is None:
-        raise Exception("DPP configuration not completed (Configurator)")
-    ev = dev[0].wait_event(["DPP-CONF-RECEIVED"], timeout=5)
-    if ev is None:
-        raise Exception("DPP configuration not completed (Enrollee)")
-    dev[0].request("DPP_STOP_LISTEN")
-    dev[0].dump_monitor()
+    wait_auth_success(hapd, dev[0], configurator=hapd, enrollee=dev[0],
+                      stop_initiator=True)
 
 def test_dpp_own_config(dev, apdev):
     """DPP configurator signing own connector"""
@@ -3081,19 +2778,7 @@ def run_dpp_own_config(dev, apdev, own_curve=None, expect_failure=False,
     cmd = "DPP_AUTH_INIT peer=%d conf=ap-dpp configurator=%d%s" % (id, conf_id, extra)
     if "OK" not in dev[0].request(cmd):
         raise Exception("Failed to initiate DPP Authentication")
-    ev = hapd.wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Responder)")
-    ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Initiator)")
-    ev = dev[0].wait_event(["DPP-CONF-SENT"], timeout=5)
-    if ev is None:
-        raise Exception("DPP configuration not completed (Configurator)")
-    ev = hapd.wait_event(["DPP-CONF-RECEIVED"], timeout=5)
-    if ev is None:
-        raise Exception("DPP configuration not completed (Enrollee)")
-
+    wait_auth_success(hapd, dev[0], configurator=dev[0], enrollee=hapd)
     update_hapd_config(hapd)
 
     dev[0].set("dpp_config_processing", "1")
@@ -3180,18 +2865,7 @@ def run_dpp_own_config_ap(dev, apdev, reconf_configurator=False, extra=""):
     cmd = "DPP_AUTH_INIT peer=%d conf=sta-dpp configurator=%d%s" % (id, conf_id, extra)
     if "OK" not in hapd.request(cmd):
         raise Exception("Failed to initiate DPP Authentication")
-    ev = hapd.wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Initiator)")
-    ev = hapd.wait_event(["DPP-CONF-SENT"], timeout=5)
-    if ev is None:
-        raise Exception("DPP configuration not completed (Configurator)")
-    ev = dev[0].wait_event(["DPP-CONF-RECEIVED", "DPP-CONF-FAILED"], timeout=5)
-    if ev is None:
-        raise Exception("DPP configuration not completed (Enrollee)")
-    if "DPP-CONF-RECEIVED" not in ev:
-        raise Exception("DPP configuration failed (Enrollee)")
-
+    wait_auth_success(dev[0], hapd, configurator=hapd, enrollee=dev[0])
     dev[0].wait_connected()
 
 def test_dpp_intro_mismatch(dev, apdev):
@@ -3890,12 +3564,7 @@ def test_dpp_proto_stop_at_auth_conf(dev, apdev):
 def test_dpp_proto_stop_at_auth_conf_tx(dev, apdev):
     """DPP protocol testing - stop when transmitting Auth Conf (Registrar)"""
     run_dpp_proto_init(dev, 1, 89, init_enrollee=True)
-    ev = dev[1].wait_event(["DPP-AUTH-SUCCESS"], timeout=10)
-    if ev is None:
-        raise Exception("Authentication did not succeed (Initiator)")
-    ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("Authentication did not succeed (Responder)")
+    wait_auth_success(dev[0], dev[1], timeout=10)
     ev = dev[1].wait_event(["GAS-QUERY-START"], timeout=0.1)
     if ev is not None:
         raise Exception("Unexpected GAS query")
@@ -3906,12 +3575,7 @@ def test_dpp_proto_stop_at_auth_conf_tx(dev, apdev):
 def test_dpp_proto_stop_at_auth_conf_tx2(dev, apdev):
     """DPP protocol testing - stop when transmitting Auth Conf (Enrollee)"""
     run_dpp_proto_init(dev, 1, 89)
-    ev = dev[1].wait_event(["DPP-AUTH-SUCCESS"], timeout=10)
-    if ev is None:
-        raise Exception("Authentication did not succeed (Initiator)")
-    ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("Authentication did not succeed (Responder)")
+    wait_auth_success(dev[0], dev[1], timeout=10)
 
     ev = dev[0].wait_event(["GAS-QUERY-DONE"], timeout=5)
     if ev is None or "result=TIMEOUT" not in ev:
@@ -4254,18 +3918,9 @@ def run_dpp_qr_code_chan_list(dev, apdev, unicast, listen_freq, chanlist,
         raise Exception("Failed to initiate DPP Authentication")
     if no_wait:
         return
-    ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=timeout)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Responder)")
-    ev = dev[1].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
-        raise Exception("DPP authentication did not succeed (Initiator)")
-    ev = dev[0].wait_event(["DPP-CONF-RECEIVED", "DPP-CONF-FAILED"], timeout=5)
-    if ev is None:
-        raise Exception("DPP configuration not completed (Enrollee)")
-    dev[0].request("DPP_STOP_LISTEN")
-    dev[0].dump_monitor()
-    dev[1].dump_monitor()
+    wait_auth_success(dev[0], dev[1], timeout=timeout, configurator=dev[1],
+                      enrollee=dev[0], allow_enrollee_failure=True,
+                      stop_responder=True)
 
 def test_dpp_qr_code_chan_list_no_match(dev, apdev):
     """DPP QR Code and no matching supported channel"""
@@ -4540,13 +4195,37 @@ def rx_process_frame(dev):
         raise Exception("MGMT_RX_PROCESS failed")
     return msg
 
-def wait_auth_success(responder, initiator):
-    ev = responder.wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
+def wait_auth_success(responder, initiator, configurator=None, enrollee=None,
+                      allow_enrollee_failure=False,
+                      allow_configurator_failure=False,
+                      require_configurator_failure=False,
+                      timeout=5, stop_responder=False, stop_initiator=False):
+    ev = responder.wait_event(["DPP-AUTH-SUCCESS", "DPP-FAIL"], timeout=timeout)
+    if ev is None or "DPP-AUTH-SUCCESS" not in ev:
         raise Exception("DPP authentication did not succeed (Responder)")
-    ev = initiator.wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
-    if ev is None:
+    ev = initiator.wait_event(["DPP-AUTH-SUCCESS", "DPP-FAIL"], timeout=5)
+    if ev is None or "DPP-AUTH-SUCCESS" not in ev:
         raise Exception("DPP authentication did not succeed (Initiator)")
+    if configurator:
+        ev = configurator.wait_event(["DPP-CONF-SENT",
+                                      "DPP-CONF-FAILED"], timeout=5)
+        if ev is None:
+            raise Exception("DPP configuration not completed (Configurator)")
+        if "DPP-CONF-FAILED" in ev and not allow_configurator_failure:
+            raise Exception("DPP configuration did not succeed (Configurator")
+        if "DPP-CONF-SUCCESS" in ev and not require_configurator_failure:
+            raise Exception("DPP configuration succeeded (Configurator")
+    if enrollee:
+        ev = enrollee.wait_event(["DPP-CONF-RECEIVED",
+                                  "DPP-CONF-FAILED"], timeout=5)
+        if ev is None:
+            raise Exception("DPP configuration not completed (Enrollee)")
+        if "DPP-CONF-FAILED" in ev and not allow_enrollee_failure:
+            raise Exception("DPP configuration did not succeed (Enrollee)")
+    if stop_responder:
+        responder.request("DPP_STOP_LISTEN")
+    if stop_initiator:
+        initiator.request("DPP_STOP_LISTEN")
 
 def wait_conf_completion(configurator, enrollee):
     ev = configurator.wait_event(["DPP-CONF-SENT"], timeout=5)