]> git.ipfire.org Git - thirdparty/hostap.git/commitdiff
tests: sigma_dut and DPP push button first on Enrollee
authorJouni Malinen <quic_jouni@quicinc.com>
Tue, 23 Aug 2022 15:56:17 +0000 (18:56 +0300)
committerJouni Malinen <j@w1.fi>
Tue, 23 Aug 2022 15:56:17 +0000 (18:56 +0300)
Signed-off-by: Jouni Malinen <quic_jouni@quicinc.com>
tests/hwsim/test_sigma_dut.py

index 272a49bd6a69f80513915074cfcdf5eef4f0f0b4..50130eacc968bfd8b7bc6740501fd2699f0eb269 100644 (file)
@@ -59,32 +59,49 @@ def sigma_log_output(cmd):
 
 sigma_prog = None
 
-def sigma_dut_cmd(cmd, port=9000, timeout=2):
+def sigma_dut_cmd(cmd, port=9000, timeout=2, dump_dev=None):
     sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM,
                          socket.IPPROTO_TCP)
-    sock.settimeout(timeout)
+    sock.settimeout(1 if dump_dev else timeout)
     addr = ('127.0.0.1', port)
     sock.connect(addr)
     sock.send(cmd.encode() + b"\r\n")
-    try:
-        res = sock.recv(1000).decode()
-        running = False
-        done = False
-        for line in res.splitlines():
-            if line.startswith("status,RUNNING"):
-                running = True
-            elif line.startswith("status,INVALID") or \
-                 line.startswith("status,ERROR") or \
-                 line.startswith("status,COMPLETE"):
-                done = True
-                res = line
-                break
-        if running and not done:
-            # Read the actual response
+    running = False
+    done = False
+    if dump_dev:
+        for i in range(timeout):
+            dump_dev.dump_monitor()
+            try:
+                res = sock.recv(1000).decode()
+                for line in res.splitlines():
+                    if line.startswith("status,RUNNING"):
+                        running = True
+                    elif line.startswith("status,INVALID") or \
+                         line.startswith("status,ERROR") or \
+                         line.startswith("status,COMPLETE"):
+                        done = True
+                        res = line
+                        break
+            except socket.timeout as e:
+                pass
+    if (not dump_dev) or (running and not done):
+        try:
             res = sock.recv(1000).decode()
-    except:
-        res = ''
-        pass
+            for line in res.splitlines():
+                if line.startswith("status,RUNNING"):
+                    running = True
+                elif line.startswith("status,INVALID") or \
+                     line.startswith("status,ERROR") or \
+                     line.startswith("status,COMPLETE"):
+                    done = True
+                    res = line
+                    break
+                if running and not done:
+                    # Read the actual response
+                    res = sock.recv(1000).decode()
+        except:
+            res = ''
+            pass
     sock.close()
     res = res.rstrip()
     logger.debug("sigma_dut: '%s' --> '%s'" % (cmd, res))
@@ -4195,6 +4212,44 @@ def test_sigma_dut_dpp_pb_sta(dev, apdev):
         stop_sigma_dut(sigma)
         dev[0].set("dpp_config_processing", "0")
 
+def dpp_ap_pb_delayed_start(hapd):
+    time.sleep(10)
+    if "OK" not in hapd.request("DPP_PUSH_BUTTON"):
+        raise Exception("Failed to press push button on the AP")
+
+def test_sigma_dut_dpp_pb_sta_first(dev, apdev):
+    """sigma_dut DPP/PB station first"""
+    check_dpp_capab(dev[0], min_ver=3)
+    check_sae_capab(dev[0])
+
+    params = {"ssid": "sae",
+              "wpa": "2",
+              "wpa_key_mgmt": "SAE",
+              "ieee80211w": "2",
+              "rsn_pairwise": "CCMP",
+              "sae_password": "sae-password"}
+    hapd = hostapd.add_ap(apdev[0], params)
+
+    ifname = dev[0].ifname
+    sigma = start_sigma_dut(ifname)
+    try:
+        t = threading.Thread(target=dpp_ap_pb_delayed_start, args=(hapd,))
+        t.start()
+
+        sigma_dut_cmd_check("sta_reset_default,interface,%s,prog,DPP" % ifname)
+
+        cmd = "dev_exec_action,program,DPP,DPPActionType,AutomaticDPP,DPPAuthRole,Responder,DPPProvisioningRole,Enrollee,DPPBS,PBBS,DPPTimeout,50,DPPWaitForConnect,Yes"
+        res = sigma_dut_cmd(cmd, timeout=60, dump_dev=dev[0])
+        t.join()
+        if "BootstrapResult,OK,AuthResult,OK,ConfResult,OK,NetworkConnectResult,OK" not in res:
+            raise Exception("Unexpected result: " + res)
+        ev = hapd.wait_event(["DPP-PB-RESULT"], timeout=1)
+        if ev is None or "success" not in ev:
+            raise Exception("Push button bootstrapping did not succeed on AP")
+    finally:
+        stop_sigma_dut(sigma)
+        dev[0].set("dpp_config_processing", "0")
+
 def dpp_ap_pb_overlap(hapd, hapd2, dev0):
     if "OK" not in hapd.request("DPP_PUSH_BUTTON"):
         raise Exception("Failed to press push button on the AP")