class WpaSupplicant:
def __init__(self, ifname=None, global_iface=None, hostname=None,
- port=9877, global_port=9878):
+ port=9877, global_port=9878, monitor=True):
+ self.monitor = monitor
self.hostname = hostname
self.group_ifname = None
self.gctrl_mon = None
self.global_iface = global_iface
if global_iface:
+ self.global_mon = None
if hostname != None:
self.global_ctrl = wpaspy.Ctrl(hostname, global_port)
- self.global_mon = wpaspy.Ctrl(hostname, global_port)
+ if self.monitor:
+ self.global_mon = wpaspy.Ctrl(hostname, global_port)
self.global_dbg = hostname + "/" + str(global_port) + "/"
else:
self.global_ctrl = wpaspy.Ctrl(global_iface)
- self.global_mon = wpaspy.Ctrl(global_iface)
+ if self.monitor:
+ self.global_mon = wpaspy.Ctrl(global_iface)
self.global_dbg = ""
- self.global_mon.attach()
+ if self.monitor:
+ self.global_mon.attach()
else:
self.global_mon = None
self.ifname = ifname
if hostname != None:
self.ctrl = wpaspy.Ctrl(hostname, port)
- self.mon = wpaspy.Ctrl(hostname, port)
+ if self.monitor:
+ self.mon = wpaspy.Ctrl(hostname, port)
self.host = remotehost.Host(hostname, ifname)
self.dbg = hostname + "/" + ifname
else:
self.ctrl = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname))
- self.mon = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname))
+ if self.monitor:
+ self.mon = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname))
self.dbg = ifname
- self.mon.attach()
+ if self.monitor:
+ self.mon.attach()
def remove_ifname(self):
if self.ifname:
if not self.ping():
logger.info("No PING response from " + self.ifname + " after reset")
+ def set(self, field, value):
+ if not "OK" in self.request("SET " + field + " " + value):
+ raise Exception("Failed to set wpa_supplicant parameter " + field)
+
def add_network(self):
id = self.request("ADD_NETWORK")
if "FAIL" in id:
quoted = [ "realm", "username", "password", "domain", "imsi",
"excluded_ssid", "milenage", "ca_cert", "client_cert",
"private_key", "domain_suffix_match", "provisioning_sp",
- "roaming_partner", "phase1", "phase2", "private_key_passwd" ]
+ "roaming_partner", "phase1", "phase2", "private_key_passwd",
+ "roaming_consortiums" ]
for field in quoted:
if field in params:
self.set_cred_quoted(id, field, params[field])
return None
raise Exception("P2P_CONNECT (auth) failed")
- def p2p_go_neg_auth_result(self, timeout=1, expect_failure=False):
+ def p2p_go_neg_auth_result(self, timeout=None, expect_failure=False):
+ if timeout is None:
+ timeout = 1 if expect_failure else 5
go_neg_res = None
ev = self.wait_global_event(["P2P-GO-NEG-SUCCESS",
"P2P-GO-NEG-FAILURE"], timeout)
except:
pass
self.gctrl_mon = None
- ev = self.wait_global_event(["P2P-GROUP-REMOVED"], timeout=3)
+ timeout = 3 if self.hostname is None else 10
+ ev = self.wait_global_event(["P2P-GROUP-REMOVED"], timeout=timeout)
if ev is None:
raise Exception("Group removal event timed out")
if "reason=GO_ENDING_SESSION" not in ev:
"private_key2", "phase1", "phase2", "domain_suffix_match",
"altsubject_match", "subject_match", "pac_file", "dh_file",
"bgscan", "ht_mcs", "id_str", "openssl_ciphers",
- "domain_match" ]
+ "domain_match", "dpp_connector", "sae_password",
+ "sae_password_id" ]
for field in quoted:
if field in kwargs and kwargs[field]:
self.set_network_quoted(id, field, kwargs[field])
"ht40_intolerant", "update_identifier", "mac_addr",
"erp", "bg_scan_period", "bssid_blacklist",
"bssid_whitelist", "mem_only_psk", "eap_workaround",
- "engine" ]
+ "engine", "fils_dh_group", "bssid_hint",
+ "dpp_csign", "dpp_csign_expiry",
+ "dpp_netaccesskey", "dpp_netaccesskey_expiry",
+ "group_mgmt", "owe_group",
+ "roaming_consortium_selection", "ocv",
+ "multi_ap_backhaul_sta", "rx_stbc", "tx_stbc" ]
for field in not_quoted:
if field in kwargs and kwargs[field]:
self.set_network(id, field, kwargs[field])
self.select_network(id)
return id
- def scan(self, type=None, freq=None, no_wait=False, only_new=False):
+ def scan(self, type=None, freq=None, no_wait=False, only_new=False,
+ passive=False):
if type:
cmd = "SCAN TYPE=" + type
else:
cmd = cmd + " freq=" + str(freq)
if only_new:
cmd += " only_new=1"
+ if passive:
+ cmd += " passive=1"
if not no_wait:
self.dump_monitor()
if not "OK" in self.request(cmd):
raise Exception("Failed to trigger scan")
if no_wait:
return
- ev = self.wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
+ ev = self.wait_event(["CTRL-EVENT-SCAN-RESULTS",
+ "CTRL-EVENT-SCAN-FAILED"], 15)
if ev is None:
raise Exception("Scan timed out")
+ if "CTRL-EVENT-SCAN-FAILED" in ev:
+ raise Exception("Scan failed: " + ev)
- def scan_for_bss(self, bssid, freq=None, force_scan=False, only_new=False):
+ def scan_for_bss(self, bssid, freq=None, force_scan=False, only_new=False,
+ passive=False):
if not force_scan and self.get_bss(bssid) is not None:
return
for i in range(0, 10):
- self.scan(freq=freq, type="ONLY", only_new=only_new)
+ self.scan(freq=freq, type="ONLY", only_new=only_new,
+ passive=passive)
if self.get_bss(bssid) is not None:
return
raise Exception("Could not find BSS " + bssid + " in scan")
if bssid not in l:
continue
vals = dict()
- [index,aa,pmkid,expiration,opportunistic] = l.split(' ')
+ try:
+ [index,aa,pmkid,expiration,opportunistic] = l.split(' ')
+ cache_id = None
+ except ValueError:
+ [index,aa,pmkid,expiration,opportunistic,cache_id] = l.split(' ')
vals['index'] = index
vals['pmkid'] = pmkid
vals['expiration'] = expiration
vals['opportunistic'] = opportunistic
+ if cache_id != None:
+ vals['cache_id'] = cache_id
return vals
return None
if "OK" not in self.global_request("%s %s adv_id=%s adv_mac=%s session=%d session_mac=%s %s" %
(cmd, peer, adv_id, adv_mac, session_id, session_mac, params)):
raise Exception("%s request failed" % cmd)
+
+ def note(self, txt):
+ self.request("NOTE " + txt)
+
+ def wait_regdom(self, country_ie=False):
+ for i in range(5):
+ ev = self.wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
+ if ev is None:
+ break
+ if country_ie:
+ if "init=COUNTRY_IE" in ev:
+ break
+ else:
+ break