]> git.ipfire.org Git - thirdparty/hostap.git/blobdiff - tests/hwsim/wpasupplicant.py
tests: Allow overriding HT STBC capabilities
[thirdparty/hostap.git] / tests / hwsim / wpasupplicant.py
index d3f5c9f19a59368859d528452d718242a2c1199e..224686c8466fc3523d1cf476c2f14a06a250adb4 100644 (file)
@@ -19,7 +19,8 @@ wpas_ctrl = '/var/run/wpa_supplicant'
 
 class WpaSupplicant:
     def __init__(self, ifname=None, global_iface=None, hostname=None,
-                 port=9877, global_port=9878):
+                 port=9877, global_port=9878, monitor=True):
+        self.monitor = monitor
         self.hostname = hostname
         self.group_ifname = None
         self.gctrl_mon = None
@@ -37,15 +38,19 @@ class WpaSupplicant:
 
         self.global_iface = global_iface
         if global_iface:
+            self.global_mon = None
             if hostname != None:
                 self.global_ctrl = wpaspy.Ctrl(hostname, global_port)
-                self.global_mon = wpaspy.Ctrl(hostname, global_port)
+                if self.monitor:
+                    self.global_mon = wpaspy.Ctrl(hostname, global_port)
                 self.global_dbg = hostname + "/" + str(global_port) + "/"
             else:
                 self.global_ctrl = wpaspy.Ctrl(global_iface)
-                self.global_mon = wpaspy.Ctrl(global_iface)
+                if self.monitor:
+                    self.global_mon = wpaspy.Ctrl(global_iface)
                 self.global_dbg = ""
-            self.global_mon.attach()
+            if self.monitor:
+                self.global_mon.attach()
         else:
             self.global_mon = None
 
@@ -81,14 +86,17 @@ class WpaSupplicant:
         self.ifname = ifname
         if hostname != None:
             self.ctrl = wpaspy.Ctrl(hostname, port)
-            self.mon = wpaspy.Ctrl(hostname, port)
+            if self.monitor:
+                self.mon = wpaspy.Ctrl(hostname, port)
             self.host = remotehost.Host(hostname, ifname)
             self.dbg = hostname + "/" + ifname
         else:
             self.ctrl = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname))
-            self.mon = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname))
+            if self.monitor:
+                self.mon = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname))
             self.dbg = ifname
-        self.mon.attach()
+        if self.monitor:
+            self.mon.attach()
 
     def remove_ifname(self):
         if self.ifname:
@@ -250,6 +258,10 @@ class WpaSupplicant:
         if not self.ping():
             logger.info("No PING response from " + self.ifname + " after reset")
 
+    def set(self, field, value):
+        if not "OK" in self.request("SET " + field + " " + value):
+            raise Exception("Failed to set wpa_supplicant parameter " + field)
+
     def add_network(self):
         id = self.request("ADD_NETWORK")
         if "FAIL" in id:
@@ -367,7 +379,8 @@ class WpaSupplicant:
         quoted = [ "realm", "username", "password", "domain", "imsi",
                    "excluded_ssid", "milenage", "ca_cert", "client_cert",
                    "private_key", "domain_suffix_match", "provisioning_sp",
-                   "roaming_partner", "phase1", "phase2", "private_key_passwd" ]
+                   "roaming_partner", "phase1", "phase2", "private_key_passwd",
+                   "roaming_consortiums" ]
         for field in quoted:
             if field in params:
                 self.set_cred_quoted(id, field, params[field])
@@ -678,7 +691,9 @@ class WpaSupplicant:
             return None
         raise Exception("P2P_CONNECT (auth) failed")
 
-    def p2p_go_neg_auth_result(self, timeout=1, expect_failure=False):
+    def p2p_go_neg_auth_result(self, timeout=None, expect_failure=False):
+        if timeout is None:
+            timeout = 1 if expect_failure else 5
         go_neg_res = None
         ev = self.wait_global_event(["P2P-GO-NEG-SUCCESS",
                                      "P2P-GO-NEG-FAILURE"], timeout)
@@ -815,7 +830,8 @@ class WpaSupplicant:
             except:
                 pass
             self.gctrl_mon = None
-        ev = self.wait_global_event(["P2P-GROUP-REMOVED"], timeout=3)
+        timeout = 3 if self.hostname is None else 10
+        ev = self.wait_global_event(["P2P-GROUP-REMOVED"], timeout=timeout)
         if ev is None:
             raise Exception("Group removal event timed out")
         if "reason=GO_ENDING_SESSION" not in ev:
@@ -995,7 +1011,8 @@ class WpaSupplicant:
                    "private_key2", "phase1", "phase2", "domain_suffix_match",
                    "altsubject_match", "subject_match", "pac_file", "dh_file",
                    "bgscan", "ht_mcs", "id_str", "openssl_ciphers",
-                   "domain_match" ]
+                   "domain_match", "dpp_connector", "sae_password",
+                   "sae_password_id" ]
         for field in quoted:
             if field in kwargs and kwargs[field]:
                 self.set_network_quoted(id, field, kwargs[field])
@@ -1010,7 +1027,12 @@ class WpaSupplicant:
                        "ht40_intolerant", "update_identifier", "mac_addr",
                        "erp", "bg_scan_period", "bssid_blacklist",
                        "bssid_whitelist", "mem_only_psk", "eap_workaround",
-                       "engine" ]
+                       "engine", "fils_dh_group", "bssid_hint",
+                       "dpp_csign", "dpp_csign_expiry",
+                       "dpp_netaccesskey", "dpp_netaccesskey_expiry",
+                       "group_mgmt", "owe_group",
+                       "roaming_consortium_selection", "ocv",
+                       "multi_ap_backhaul_sta", "rx_stbc", "tx_stbc" ]
         for field in not_quoted:
             if field in kwargs and kwargs[field]:
                 self.set_network(id, field, kwargs[field])
@@ -1037,7 +1059,8 @@ class WpaSupplicant:
             self.select_network(id)
         return id
 
-    def scan(self, type=None, freq=None, no_wait=False, only_new=False):
+    def scan(self, type=None, freq=None, no_wait=False, only_new=False,
+             passive=False):
         if type:
             cmd = "SCAN TYPE=" + type
         else:
@@ -1046,21 +1069,28 @@ class WpaSupplicant:
             cmd = cmd + " freq=" + str(freq)
         if only_new:
             cmd += " only_new=1"
+        if passive:
+            cmd += " passive=1"
         if not no_wait:
             self.dump_monitor()
         if not "OK" in self.request(cmd):
             raise Exception("Failed to trigger scan")
         if no_wait:
             return
-        ev = self.wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
+        ev = self.wait_event(["CTRL-EVENT-SCAN-RESULTS",
+                              "CTRL-EVENT-SCAN-FAILED"], 15)
         if ev is None:
             raise Exception("Scan timed out")
+        if "CTRL-EVENT-SCAN-FAILED" in ev:
+            raise Exception("Scan failed: " + ev)
 
-    def scan_for_bss(self, bssid, freq=None, force_scan=False, only_new=False):
+    def scan_for_bss(self, bssid, freq=None, force_scan=False, only_new=False,
+                     passive=False):
         if not force_scan and self.get_bss(bssid) is not None:
             return
         for i in range(0, 10):
-            self.scan(freq=freq, type="ONLY", only_new=only_new)
+            self.scan(freq=freq, type="ONLY", only_new=only_new,
+                      passive=passive)
             if self.get_bss(bssid) is not None:
                 return
         raise Exception("Could not find BSS " + bssid + " in scan")
@@ -1166,11 +1196,17 @@ class WpaSupplicant:
             if bssid not in l:
                 continue
             vals = dict()
-            [index,aa,pmkid,expiration,opportunistic] = l.split(' ')
+            try:
+                [index,aa,pmkid,expiration,opportunistic] = l.split(' ')
+                cache_id = None
+            except ValueError:
+                [index,aa,pmkid,expiration,opportunistic,cache_id] = l.split(' ')
             vals['index'] = index
             vals['pmkid'] = pmkid
             vals['expiration'] = expiration
             vals['opportunistic'] = opportunistic
+            if cache_id != None:
+                vals['cache_id'] = cache_id
             return vals
         return None
 
@@ -1280,3 +1316,17 @@ class WpaSupplicant:
         if "OK" not in self.global_request("%s %s adv_id=%s adv_mac=%s session=%d session_mac=%s %s" %
                                            (cmd, peer, adv_id, adv_mac, session_id, session_mac, params)):
             raise Exception("%s request failed" % cmd)
+
+    def note(self, txt):
+        self.request("NOTE " + txt)
+
+    def wait_regdom(self, country_ie=False):
+        for i in range(5):
+            ev = self.wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
+            if ev is None:
+                break
+            if country_ie:
+                if "init=COUNTRY_IE" in ev:
+                    break
+            else:
+                break