]> git.ipfire.org Git - thirdparty/hostap.git/blobdiff - tests/hwsim/wpasupplicant.py
tests: Allow overriding HT STBC capabilities
[thirdparty/hostap.git] / tests / hwsim / wpasupplicant.py
index 77c84f8b6e0a25235b2e7f2db21d2c2458bc2c9e..224686c8466fc3523d1cf476c2f14a06a250adb4 100644 (file)
@@ -12,17 +12,20 @@ import re
 import struct
 import wpaspy
 import remotehost
+import subprocess
 
 logger = logging.getLogger()
 wpas_ctrl = '/var/run/wpa_supplicant'
 
 class WpaSupplicant:
     def __init__(self, ifname=None, global_iface=None, hostname=None,
-                 port=9877, global_port=9878):
+                 port=9877, global_port=9878, monitor=True):
+        self.monitor = monitor
         self.hostname = hostname
         self.group_ifname = None
         self.gctrl_mon = None
         self.host = remotehost.Host(hostname, ifname)
+        self._group_dbg = None
         if ifname:
             self.set_ifname(ifname, hostname, port)
             res = self.get_driver_status()
@@ -35,18 +38,36 @@ class WpaSupplicant:
 
         self.global_iface = global_iface
         if global_iface:
+            self.global_mon = None
             if hostname != None:
                 self.global_ctrl = wpaspy.Ctrl(hostname, global_port)
-                self.global_mon = wpaspy.Ctrl(hostname, global_port)
+                if self.monitor:
+                    self.global_mon = wpaspy.Ctrl(hostname, global_port)
                 self.global_dbg = hostname + "/" + str(global_port) + "/"
             else:
                 self.global_ctrl = wpaspy.Ctrl(global_iface)
-                self.global_mon = wpaspy.Ctrl(global_iface)
+                if self.monitor:
+                    self.global_mon = wpaspy.Ctrl(global_iface)
                 self.global_dbg = ""
-            self.global_mon.attach()
+            if self.monitor:
+                self.global_mon.attach()
         else:
             self.global_mon = None
 
+    def cmd_execute(self, cmd_array, shell=False):
+        if self.hostname is None:
+            if shell:
+                cmd = ' '.join(cmd_array)
+            else:
+                cmd = cmd_array
+            proc = subprocess.Popen(cmd, stderr=subprocess.STDOUT,
+                                    stdout=subprocess.PIPE, shell=shell)
+            out = proc.communicate()[0]
+            ret = proc.returncode
+            return ret, out
+        else:
+            return self.host.execute(cmd_array)
+
     def terminate(self):
         if self.global_mon:
             self.global_mon.detach()
@@ -65,14 +86,17 @@ class WpaSupplicant:
         self.ifname = ifname
         if hostname != None:
             self.ctrl = wpaspy.Ctrl(hostname, port)
-            self.mon = wpaspy.Ctrl(hostname, port)
+            if self.monitor:
+                self.mon = wpaspy.Ctrl(hostname, port)
             self.host = remotehost.Host(hostname, ifname)
             self.dbg = hostname + "/" + ifname
         else:
             self.ctrl = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname))
-            self.mon = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname))
+            if self.monitor:
+                self.mon = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname))
             self.dbg = ifname
-        self.mon.attach()
+        if self.monitor:
+            self.mon.attach()
 
     def remove_ifname(self):
         if self.ifname:
@@ -156,10 +180,26 @@ class WpaSupplicant:
             logger.debug(self.global_dbg + ifname + ": CTRL(global): " + cmd)
             return self.global_ctrl.request(cmd)
 
+    @property
+    def group_dbg(self):
+        if self._group_dbg is not None:
+            return self._group_dbg
+        if self.group_ifname is None:
+            raise Exception("Cannot have group_dbg without group_ifname")
+        if self.hostname is None:
+            self._group_dbg = self.group_ifname
+        else:
+            self._group_dbg = self.hostname + "/" + self.group_ifname
+        return self._group_dbg
+
     def group_request(self, cmd):
         if self.group_ifname and self.group_ifname != self.ifname:
-            logger.debug(self.group_ifname + ": CTRL: " + cmd)
-            gctrl = wpaspy.Ctrl(os.path.join(wpas_ctrl, self.group_ifname))
+            if self.hostname is None:
+                gctrl = wpaspy.Ctrl(os.path.join(wpas_ctrl, self.group_ifname))
+            else:
+                port = self.get_ctrl_iface_port(self.group_ifname)
+                gctrl = wpaspy.Ctrl(self.hostname, port)
+            logger.debug(self.group_dbg + ": CTRL(group): " + cmd)
             return gctrl.request(cmd)
         return self.request(cmd)
 
@@ -218,6 +258,10 @@ class WpaSupplicant:
         if not self.ping():
             logger.info("No PING response from " + self.ifname + " after reset")
 
+    def set(self, field, value):
+        if not "OK" in self.request("SET " + field + " " + value):
+            raise Exception("Failed to set wpa_supplicant parameter " + field)
+
     def add_network(self):
         id = self.request("ADD_NETWORK")
         if "FAIL" in id:
@@ -335,7 +379,8 @@ class WpaSupplicant:
         quoted = [ "realm", "username", "password", "domain", "imsi",
                    "excluded_ssid", "milenage", "ca_cert", "client_cert",
                    "private_key", "domain_suffix_match", "provisioning_sp",
-                   "roaming_partner", "phase1", "phase2" ]
+                   "roaming_partner", "phase1", "phase2", "private_key_passwd",
+                   "roaming_consortiums" ]
         for field in quoted:
             if field in params:
                 self.set_cred_quoted(id, field, params[field])
@@ -349,7 +394,7 @@ class WpaSupplicant:
             if field in params:
                 self.set_cred(id, field, params[field])
 
-        return id;
+        return id
 
     def select_network(self, id, freq=None):
         if freq:
@@ -373,7 +418,9 @@ class WpaSupplicant:
             raise Exception("MESH_GROUP_REMOVE failed")
         return None
 
-    def connect_network(self, id, timeout=10):
+    def connect_network(self, id, timeout=None):
+        if timeout is None:
+            timeout = 10 if self.hostname is None else 60
         self.dump_monitor()
         self.select_network(id)
         self.wait_connected(timeout=timeout)
@@ -574,7 +621,12 @@ class WpaSupplicant:
         res['ifname'] = s[2]
         self.group_ifname = s[2]
         try:
-            self.gctrl_mon = wpaspy.Ctrl(os.path.join(wpas_ctrl, self.group_ifname))
+            if self.hostname is None:
+                self.gctrl_mon = wpaspy.Ctrl(os.path.join(wpas_ctrl,
+                                                          self.group_ifname))
+            else:
+                port = self.get_ctrl_iface_port(self.group_ifname)
+                self.gctrl_mon = wpaspy.Ctrl(self.hostname, port)
             self.gctrl_mon.attach()
         except:
             logger.debug("Could not open monitor socket for group interface")
@@ -639,17 +691,19 @@ class WpaSupplicant:
             return None
         raise Exception("P2P_CONNECT (auth) failed")
 
-    def p2p_go_neg_auth_result(self, timeout=1, expect_failure=False):
+    def p2p_go_neg_auth_result(self, timeout=None, expect_failure=False):
+        if timeout is None:
+            timeout = 1 if expect_failure else 5
         go_neg_res = None
         ev = self.wait_global_event(["P2P-GO-NEG-SUCCESS",
-                                     "P2P-GO-NEG-FAILURE"], timeout);
+                                     "P2P-GO-NEG-FAILURE"], timeout)
         if ev is None:
             if expect_failure:
                 return None
             raise Exception("Group formation timed out")
         if "P2P-GO-NEG-SUCCESS" in ev:
             go_neg_res = ev
-            ev = self.wait_global_event(["P2P-GROUP-STARTED"], timeout);
+            ev = self.wait_global_event(["P2P-GROUP-STARTED"], timeout)
             if ev is None:
                 if expect_failure:
                     return None
@@ -755,7 +809,7 @@ class WpaSupplicant:
             while True:
                 while self.gctrl_mon.pending():
                     ev = self.gctrl_mon.recv()
-                    logger.debug(self.group_ifname + ": " + ev)
+                    logger.debug(self.group_dbg + "(group): " + ev)
                     for event in events:
                         if event in ev:
                             return ev
@@ -776,7 +830,8 @@ class WpaSupplicant:
             except:
                 pass
             self.gctrl_mon = None
-        ev = self.wait_global_event(["P2P-GROUP-REMOVED"], timeout=3)
+        timeout = 3 if self.hostname is None else 10
+        ev = self.wait_global_event(["P2P-GROUP-REMOVED"], timeout=timeout)
         if ev is None:
             raise Exception("Group removal event timed out")
         if "reason=GO_ENDING_SESSION" not in ev:
@@ -956,7 +1011,8 @@ class WpaSupplicant:
                    "private_key2", "phase1", "phase2", "domain_suffix_match",
                    "altsubject_match", "subject_match", "pac_file", "dh_file",
                    "bgscan", "ht_mcs", "id_str", "openssl_ciphers",
-                   "domain_match" ]
+                   "domain_match", "dpp_connector", "sae_password",
+                   "sae_password_id" ]
         for field in quoted:
             if field in kwargs and kwargs[field]:
                 self.set_network_quoted(id, field, kwargs[field])
@@ -971,7 +1027,12 @@ class WpaSupplicant:
                        "ht40_intolerant", "update_identifier", "mac_addr",
                        "erp", "bg_scan_period", "bssid_blacklist",
                        "bssid_whitelist", "mem_only_psk", "eap_workaround",
-                       "engine" ]
+                       "engine", "fils_dh_group", "bssid_hint",
+                       "dpp_csign", "dpp_csign_expiry",
+                       "dpp_netaccesskey", "dpp_netaccesskey_expiry",
+                       "group_mgmt", "owe_group",
+                       "roaming_consortium_selection", "ocv",
+                       "multi_ap_backhaul_sta", "rx_stbc", "tx_stbc" ]
         for field in not_quoted:
             if field in kwargs and kwargs[field]:
                 self.set_network(id, field, kwargs[field])
@@ -998,7 +1059,8 @@ class WpaSupplicant:
             self.select_network(id)
         return id
 
-    def scan(self, type=None, freq=None, no_wait=False, only_new=False):
+    def scan(self, type=None, freq=None, no_wait=False, only_new=False,
+             passive=False):
         if type:
             cmd = "SCAN TYPE=" + type
         else:
@@ -1007,21 +1069,28 @@ class WpaSupplicant:
             cmd = cmd + " freq=" + str(freq)
         if only_new:
             cmd += " only_new=1"
+        if passive:
+            cmd += " passive=1"
         if not no_wait:
             self.dump_monitor()
         if not "OK" in self.request(cmd):
             raise Exception("Failed to trigger scan")
         if no_wait:
             return
-        ev = self.wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
+        ev = self.wait_event(["CTRL-EVENT-SCAN-RESULTS",
+                              "CTRL-EVENT-SCAN-FAILED"], 15)
         if ev is None:
             raise Exception("Scan timed out")
+        if "CTRL-EVENT-SCAN-FAILED" in ev:
+            raise Exception("Scan failed: " + ev)
 
-    def scan_for_bss(self, bssid, freq=None, force_scan=False, only_new=False):
+    def scan_for_bss(self, bssid, freq=None, force_scan=False, only_new=False,
+                     passive=False):
         if not force_scan and self.get_bss(bssid) is not None:
             return
         for i in range(0, 10):
-            self.scan(freq=freq, type="ONLY", only_new=only_new)
+            self.scan(freq=freq, type="ONLY", only_new=only_new,
+                      passive=passive)
             if self.get_bss(bssid) is not None:
                 return
         raise Exception("Could not find BSS " + bssid + " in scan")
@@ -1127,11 +1196,17 @@ class WpaSupplicant:
             if bssid not in l:
                 continue
             vals = dict()
-            [index,aa,pmkid,expiration,opportunistic] = l.split(' ')
+            try:
+                [index,aa,pmkid,expiration,opportunistic] = l.split(' ')
+                cache_id = None
+            except ValueError:
+                [index,aa,pmkid,expiration,opportunistic,cache_id] = l.split(' ')
             vals['index'] = index
             vals['pmkid'] = pmkid
             vals['expiration'] = expiration
             vals['opportunistic'] = opportunistic
+            if cache_id != None:
+                vals['cache_id'] = cache_id
             return vals
         return None
 
@@ -1202,7 +1277,9 @@ class WpaSupplicant:
             raise Exception(error)
         return ev
 
-    def wait_disconnected(self, timeout=10, error="Disconnection timed out"):
+    def wait_disconnected(self, timeout=None, error="Disconnection timed out"):
+        if timeout is None:
+            timeout = 10 if self.hostname is None else 30
         ev = self.wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=timeout)
         if ev is None:
             raise Exception(error)
@@ -1239,3 +1316,17 @@ class WpaSupplicant:
         if "OK" not in self.global_request("%s %s adv_id=%s adv_mac=%s session=%d session_mac=%s %s" %
                                            (cmd, peer, adv_id, adv_mac, session_id, session_mac, params)):
             raise Exception("%s request failed" % cmd)
+
+    def note(self, txt):
+        self.request("NOTE " + txt)
+
+    def wait_regdom(self, country_ie=False):
+        for i in range(5):
+            ev = self.wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
+            if ev is None:
+                break
+            if country_ie:
+                if "init=COUNTRY_IE" in ev:
+                    break
+            else:
+                break