]> git.ipfire.org Git - thirdparty/hostap.git/blob - tests/hwsim/wpasupplicant.py
tests: wpa_supplicant ctrl_iface
[thirdparty/hostap.git] / tests / hwsim / wpasupplicant.py
1 #!/usr/bin/python
2 #
3 # Python class for controlling wpa_supplicant
4 # Copyright (c) 2013-2014, Jouni Malinen <j@w1.fi>
5 #
6 # This software may be distributed under the terms of the BSD license.
7 # See README for more details.
8
9 import os
10 import time
11 import logging
12 import re
13 import subprocess
14 import wpaspy
15
16 logger = logging.getLogger()
17 wpas_ctrl = '/var/run/wpa_supplicant'
18
19 class WpaSupplicant:
20 def __init__(self, ifname=None, global_iface=None):
21 self.group_ifname = None
22 if ifname:
23 self.set_ifname(ifname)
24 else:
25 self.ifname = None
26
27 self.global_iface = global_iface
28 if global_iface:
29 self.global_ctrl = wpaspy.Ctrl(global_iface)
30 self.global_mon = wpaspy.Ctrl(global_iface)
31 self.global_mon.attach()
32
33 def set_ifname(self, ifname):
34 self.ifname = ifname
35 self.ctrl = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname))
36 self.mon = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname))
37 self.mon.attach()
38
39 def remove_ifname(self):
40 if self.ifname:
41 self.mon.detach()
42 self.mon = None
43 self.ctrl = None
44 self.ifname = None
45
46 def interface_add(self, ifname, driver="nl80211", drv_params=None):
47 try:
48 groups = subprocess.check_output(["id"])
49 group = "admin" if "(admin)" in groups else "adm"
50 except Exception, e:
51 group = "admin"
52 cmd = "INTERFACE_ADD " + ifname + "\t\t" + driver + "\tDIR=/var/run/wpa_supplicant GROUP=" + group
53 if drv_params:
54 cmd = cmd + '\t' + drv_params
55 if "FAIL" in self.global_request(cmd):
56 raise Exception("Failed to add a dynamic wpa_supplicant interface")
57 self.set_ifname(ifname)
58
59 def interface_remove(self, ifname):
60 self.remove_ifname()
61 self.global_request("INTERFACE_REMOVE " + ifname)
62
63 def request(self, cmd):
64 logger.debug(self.ifname + ": CTRL: " + cmd)
65 return self.ctrl.request(cmd)
66
67 def global_request(self, cmd):
68 if self.global_iface is None:
69 self.request(cmd)
70 else:
71 ifname = self.ifname or self.global_iface
72 logger.debug(ifname + ": CTRL: " + cmd)
73 return self.global_ctrl.request(cmd)
74
75 def group_request(self, cmd):
76 if self.group_ifname and self.group_ifname != self.ifname:
77 logger.debug(self.group_ifname + ": CTRL: " + cmd)
78 gctrl = wpaspy.Ctrl(os.path.join(wpas_ctrl, self.group_ifname))
79 return gctrl.request(cmd)
80 return self.request(cmd)
81
82 def ping(self):
83 return "PONG" in self.request("PING")
84
85 def reset(self):
86 res = self.request("FLUSH")
87 if not "OK" in res:
88 logger.info("FLUSH to " + self.ifname + " failed: " + res)
89 self.request("SET external_sim 0")
90 self.request("SET hessid 00:00:00:00:00:00")
91 self.request("SET access_network_type 15")
92 self.request("SET p2p_add_cli_chan 0")
93 self.request("SET p2p_no_go_freq ")
94 self.request("SET p2p_pref_chan ")
95 self.request("SET p2p_no_group_iface 1")
96 self.request("SET p2p_go_intent 7")
97 self.group_ifname = None
98 self.dump_monitor()
99
100 iter = 0
101 while iter < 60:
102 state = self.get_driver_status_field("scan_state")
103 if "SCAN_STARTED" in state or "SCAN_REQUESTED" in state:
104 logger.info(self.ifname + ": Waiting for scan operation to complete before continuing")
105 time.sleep(1)
106 else:
107 break
108 iter = iter + 1
109 if iter == 60:
110 logger.error(self.ifname + ": Driver scan state did not clear")
111 print "Trying to clear cfg80211/mac80211 scan state"
112 try:
113 cmd = ["sudo", "ifconfig", self.ifname, "down"]
114 subprocess.call(cmd)
115 except subprocess.CalledProcessError, e:
116 logger.info("ifconfig failed: " + str(e.returncode))
117 logger.info(e.output)
118 try:
119 cmd = ["sudo", "ifconfig", self.ifname, "up"]
120 subprocess.call(cmd)
121 except subprocess.CalledProcessError, e:
122 logger.info("ifconfig failed: " + str(e.returncode))
123 logger.info(e.output)
124 if iter > 0:
125 # The ongoing scan could have discovered BSSes or P2P peers
126 logger.info("Run FLUSH again since scan was in progress")
127 self.request("FLUSH")
128 self.dump_monitor()
129
130 if not self.ping():
131 logger.info("No PING response from " + self.ifname + " after reset")
132
133 def add_network(self):
134 id = self.request("ADD_NETWORK")
135 if "FAIL" in id:
136 raise Exception("ADD_NETWORK failed")
137 return int(id)
138
139 def remove_network(self, id):
140 id = self.request("REMOVE_NETWORK " + str(id))
141 if "FAIL" in id:
142 raise Exception("REMOVE_NETWORK failed")
143 return None
144
145 def get_network(self, id, field):
146 res = self.request("GET_NETWORK " + str(id) + " " + field)
147 if res == "FAIL\n":
148 return None
149 return res
150
151 def set_network(self, id, field, value):
152 res = self.request("SET_NETWORK " + str(id) + " " + field + " " + value)
153 if "FAIL" in res:
154 raise Exception("SET_NETWORK failed")
155 return None
156
157 def set_network_quoted(self, id, field, value):
158 res = self.request("SET_NETWORK " + str(id) + " " + field + ' "' + value + '"')
159 if "FAIL" in res:
160 raise Exception("SET_NETWORK failed")
161 return None
162
163 def list_networks(self):
164 res = self.request("LIST_NETWORKS")
165 lines = res.splitlines()
166 networks = []
167 for l in lines:
168 if "network id" in l:
169 continue
170 [id,ssid,bssid,flags] = l.split('\t')
171 network = {}
172 network['id'] = id
173 network['ssid'] = ssid
174 network['bssid'] = bssid
175 network['flags'] = flags
176 networks.append(network)
177 return networks
178
179 def hs20_enable(self):
180 self.request("SET interworking 1")
181 self.request("SET hs20 1")
182
183 def add_cred(self):
184 id = self.request("ADD_CRED")
185 if "FAIL" in id:
186 raise Exception("ADD_CRED failed")
187 return int(id)
188
189 def remove_cred(self, id):
190 id = self.request("REMOVE_CRED " + str(id))
191 if "FAIL" in id:
192 raise Exception("REMOVE_CRED failed")
193 return None
194
195 def set_cred(self, id, field, value):
196 res = self.request("SET_CRED " + str(id) + " " + field + " " + value)
197 if "FAIL" in res:
198 raise Exception("SET_CRED failed")
199 return None
200
201 def set_cred_quoted(self, id, field, value):
202 res = self.request("SET_CRED " + str(id) + " " + field + ' "' + value + '"')
203 if "FAIL" in res:
204 raise Exception("SET_CRED failed")
205 return None
206
207 def add_cred_values(self, params):
208 id = self.add_cred()
209
210 quoted = [ "realm", "username", "password", "domain", "imsi",
211 "excluded_ssid", "milenage", "ca_cert", "client_cert",
212 "private_key" ]
213 for field in quoted:
214 if field in params:
215 self.set_cred_quoted(id, field, params[field])
216
217 not_quoted = [ "eap", "roaming_consortium",
218 "required_roaming_consortium" ]
219 for field in not_quoted:
220 if field in params:
221 self.set_cred(id, field, params[field])
222
223 return id;
224
225 def select_network(self, id):
226 id = self.request("SELECT_NETWORK " + str(id))
227 if "FAIL" in id:
228 raise Exception("SELECT_NETWORK failed")
229 return None
230
231 def connect_network(self, id, timeout=10):
232 self.dump_monitor()
233 self.select_network(id)
234 ev = self.wait_event(["CTRL-EVENT-CONNECTED"], timeout=timeout)
235 if ev is None:
236 raise Exception("Association with the AP timed out")
237 self.dump_monitor()
238
239 def get_status(self):
240 res = self.request("STATUS")
241 lines = res.splitlines()
242 vals = dict()
243 for l in lines:
244 [name,value] = l.split('=', 1)
245 vals[name] = value
246 return vals
247
248 def get_status_field(self, field):
249 vals = self.get_status()
250 if field in vals:
251 return vals[field]
252 return None
253
254 def get_group_status(self):
255 res = self.group_request("STATUS")
256 lines = res.splitlines()
257 vals = dict()
258 for l in lines:
259 [name,value] = l.split('=', 1)
260 vals[name] = value
261 return vals
262
263 def get_group_status_field(self, field):
264 vals = self.get_group_status()
265 if field in vals:
266 return vals[field]
267 return None
268
269 def get_driver_status(self):
270 res = self.request("STATUS-DRIVER")
271 lines = res.splitlines()
272 vals = dict()
273 for l in lines:
274 [name,value] = l.split('=', 1)
275 vals[name] = value
276 return vals
277
278 def get_driver_status_field(self, field):
279 vals = self.get_driver_status()
280 if field in vals:
281 return vals[field]
282 return None
283
284 def p2p_dev_addr(self):
285 return self.get_status_field("p2p_device_address")
286
287 def p2p_interface_addr(self):
288 return self.get_group_status_field("address")
289
290 def p2p_listen(self):
291 return self.global_request("P2P_LISTEN")
292
293 def p2p_find(self, social=False, dev_id=None, dev_type=None):
294 cmd = "P2P_FIND"
295 if social:
296 cmd = cmd + " type=social"
297 if dev_id:
298 cmd = cmd + " dev_id=" + dev_id
299 if dev_type:
300 cmd = cmd + " dev_type=" + dev_type
301 return self.global_request(cmd)
302
303 def p2p_stop_find(self):
304 return self.global_request("P2P_STOP_FIND")
305
306 def wps_read_pin(self):
307 #TODO: make this random
308 self.pin = "12345670"
309 return self.pin
310
311 def peer_known(self, peer, full=True):
312 res = self.global_request("P2P_PEER " + peer)
313 if peer.lower() not in res.lower():
314 return False
315 if not full:
316 return True
317 return "[PROBE_REQ_ONLY]" not in res
318
319 def discover_peer(self, peer, full=True, timeout=15, social=True, force_find=False):
320 logger.info(self.ifname + ": Trying to discover peer " + peer)
321 if not force_find and self.peer_known(peer, full):
322 return True
323 self.p2p_find(social)
324 count = 0
325 while count < timeout:
326 time.sleep(1)
327 count = count + 1
328 if self.peer_known(peer, full):
329 return True
330 return False
331
332 def get_peer(self, peer):
333 res = self.global_request("P2P_PEER " + peer)
334 if peer.lower() not in res.lower():
335 raise Exception("Peer information not available")
336 lines = res.splitlines()
337 vals = dict()
338 for l in lines:
339 if '=' in l:
340 [name,value] = l.split('=', 1)
341 vals[name] = value
342 return vals
343
344 def group_form_result(self, ev, expect_failure=False, go_neg_res=None):
345 if expect_failure:
346 if "P2P-GROUP-STARTED" in ev:
347 raise Exception("Group formation succeeded when expecting failure")
348 exp = r'<.>(P2P-GO-NEG-FAILURE) status=([0-9]*)'
349 s = re.split(exp, ev)
350 if len(s) < 3:
351 return None
352 res = {}
353 res['result'] = 'go-neg-failed'
354 res['status'] = int(s[2])
355 return res
356
357 if "P2P-GROUP-STARTED" not in ev:
358 raise Exception("No P2P-GROUP-STARTED event seen")
359
360 exp = r'<.>(P2P-GROUP-STARTED) ([^ ]*) ([^ ]*) ssid="(.*)" freq=([0-9]*) ((?:psk=.*)|(?:passphrase=".*")) go_dev_addr=([0-9a-f:]*) ip_addr=([0-9.]*) ip_mask=([0-9.]*) go_ip_addr=([0-9.]*)'
361 s = re.split(exp, ev)
362 if len(s) < 11:
363 exp = r'<.>(P2P-GROUP-STARTED) ([^ ]*) ([^ ]*) ssid="(.*)" freq=([0-9]*) ((?:psk=.*)|(?:passphrase=".*")) go_dev_addr=([0-9a-f:]*)'
364 s = re.split(exp, ev)
365 if len(s) < 8:
366 raise Exception("Could not parse P2P-GROUP-STARTED")
367 res = {}
368 res['result'] = 'success'
369 res['ifname'] = s[2]
370 self.group_ifname = s[2]
371 res['role'] = s[3]
372 res['ssid'] = s[4]
373 res['freq'] = s[5]
374 if "[PERSISTENT]" in ev:
375 res['persistent'] = True
376 else:
377 res['persistent'] = False
378 p = re.match(r'psk=([0-9a-f]*)', s[6])
379 if p:
380 res['psk'] = p.group(1)
381 p = re.match(r'passphrase="(.*)"', s[6])
382 if p:
383 res['passphrase'] = p.group(1)
384 res['go_dev_addr'] = s[7]
385
386 if len(s) > 8 and len(s[8]) > 0:
387 res['ip_addr'] = s[8]
388 if len(s) > 9:
389 res['ip_mask'] = s[9]
390 if len(s) > 10:
391 res['go_ip_addr'] = s[10]
392
393 if go_neg_res:
394 exp = r'<.>(P2P-GO-NEG-SUCCESS) role=(GO|client) freq=([0-9]*)'
395 s = re.split(exp, go_neg_res)
396 if len(s) < 4:
397 raise Exception("Could not parse P2P-GO-NEG-SUCCESS")
398 res['go_neg_role'] = s[2]
399 res['go_neg_freq'] = s[3]
400
401 return res
402
403 def p2p_go_neg_auth(self, peer, pin, method, go_intent=None, persistent=False, freq=None):
404 if not self.discover_peer(peer):
405 raise Exception("Peer " + peer + " not found")
406 self.dump_monitor()
407 cmd = "P2P_CONNECT " + peer + " " + pin + " " + method + " auth"
408 if go_intent:
409 cmd = cmd + ' go_intent=' + str(go_intent)
410 if freq:
411 cmd = cmd + ' freq=' + str(freq)
412 if persistent:
413 cmd = cmd + " persistent"
414 if "OK" in self.global_request(cmd):
415 return None
416 raise Exception("P2P_CONNECT (auth) failed")
417
418 def p2p_go_neg_auth_result(self, timeout=1, expect_failure=False):
419 go_neg_res = None
420 ev = self.wait_global_event(["P2P-GO-NEG-SUCCESS",
421 "P2P-GO-NEG-FAILURE"], timeout);
422 if ev is None:
423 if expect_failure:
424 return None
425 raise Exception("Group formation timed out")
426 if "P2P-GO-NEG-SUCCESS" in ev:
427 go_neg_res = ev
428 ev = self.wait_global_event(["P2P-GROUP-STARTED"], timeout);
429 if ev is None:
430 if expect_failure:
431 return None
432 raise Exception("Group formation timed out")
433 self.dump_monitor()
434 return self.group_form_result(ev, expect_failure, go_neg_res)
435
436 def p2p_go_neg_init(self, peer, pin, method, timeout=0, go_intent=None, expect_failure=False, persistent=False, freq=None):
437 if not self.discover_peer(peer):
438 raise Exception("Peer " + peer + " not found")
439 self.dump_monitor()
440 if pin:
441 cmd = "P2P_CONNECT " + peer + " " + pin + " " + method
442 else:
443 cmd = "P2P_CONNECT " + peer + " " + method
444 if go_intent:
445 cmd = cmd + ' go_intent=' + str(go_intent)
446 if freq:
447 cmd = cmd + ' freq=' + str(freq)
448 if persistent:
449 cmd = cmd + " persistent"
450 if "OK" in self.global_request(cmd):
451 if timeout == 0:
452 self.dump_monitor()
453 return None
454 go_neg_res = None
455 ev = self.wait_global_event(["P2P-GO-NEG-SUCCESS",
456 "P2P-GO-NEG-FAILURE"], timeout)
457 if ev is None:
458 if expect_failure:
459 return None
460 raise Exception("Group formation timed out")
461 if "P2P-GO-NEG-SUCCESS" in ev:
462 go_neg_res = ev
463 ev = self.wait_global_event(["P2P-GROUP-STARTED"], timeout)
464 if ev is None:
465 if expect_failure:
466 return None
467 raise Exception("Group formation timed out")
468 self.dump_monitor()
469 return self.group_form_result(ev, expect_failure, go_neg_res)
470 raise Exception("P2P_CONNECT failed")
471
472 def wait_event(self, events, timeout=10):
473 start = os.times()[4]
474 while True:
475 while self.mon.pending():
476 ev = self.mon.recv()
477 logger.debug(self.ifname + ": " + ev)
478 for event in events:
479 if event in ev:
480 return ev
481 now = os.times()[4]
482 remaining = start + timeout - now
483 if remaining <= 0:
484 break
485 if not self.mon.pending(timeout=remaining):
486 break
487 return None
488
489 def wait_global_event(self, events, timeout):
490 if self.global_iface is None:
491 self.wait_event(events, timeout)
492 else:
493 start = os.times()[4]
494 while True:
495 while self.global_mon.pending():
496 ev = self.global_mon.recv()
497 logger.debug(self.ifname + "(global): " + ev)
498 for event in events:
499 if event in ev:
500 return ev
501 now = os.times()[4]
502 remaining = start + timeout - now
503 if remaining <= 0:
504 break
505 if not self.global_mon.pending(timeout=remaining):
506 break
507 return None
508
509 def wait_go_ending_session(self):
510 ev = self.wait_event(["P2P-GROUP-REMOVED"], timeout=3)
511 if ev is None:
512 raise Exception("Group removal event timed out")
513 if "reason=GO_ENDING_SESSION" not in ev:
514 raise Exception("Unexpected group removal reason")
515
516 def dump_monitor(self):
517 while self.mon.pending():
518 ev = self.mon.recv()
519 logger.debug(self.ifname + ": " + ev)
520 while self.global_mon.pending():
521 ev = self.global_mon.recv()
522 logger.debug(self.ifname + "(global): " + ev)
523
524 def remove_group(self, ifname=None):
525 if ifname is None:
526 ifname = self.group_ifname if self.group_ifname else self.ifname
527 if "OK" not in self.global_request("P2P_GROUP_REMOVE " + ifname):
528 raise Exception("Group could not be removed")
529 self.group_ifname = None
530
531 def p2p_start_go(self, persistent=None, freq=None):
532 self.dump_monitor()
533 cmd = "P2P_GROUP_ADD"
534 if persistent is None:
535 pass
536 elif persistent is True:
537 cmd = cmd + " persistent"
538 else:
539 cmd = cmd + " persistent=" + str(persistent)
540 if freq:
541 cmd = cmd + " freq=" + str(freq)
542 if "OK" in self.global_request(cmd):
543 ev = self.wait_global_event(["P2P-GROUP-STARTED"], timeout=5)
544 if ev is None:
545 raise Exception("GO start up timed out")
546 self.dump_monitor()
547 return self.group_form_result(ev)
548 raise Exception("P2P_GROUP_ADD failed")
549
550 def p2p_go_authorize_client(self, pin):
551 cmd = "WPS_PIN any " + pin
552 if "FAIL" in self.group_request(cmd):
553 raise Exception("Failed to authorize client connection on GO")
554 return None
555
556 def p2p_go_authorize_client_pbc(self):
557 cmd = "WPS_PBC"
558 if "FAIL" in self.group_request(cmd):
559 raise Exception("Failed to authorize client connection on GO")
560 return None
561
562 def p2p_connect_group(self, go_addr, pin, timeout=0, social=False):
563 self.dump_monitor()
564 if not self.discover_peer(go_addr, social=social):
565 raise Exception("GO " + go_addr + " not found")
566 self.dump_monitor()
567 cmd = "P2P_CONNECT " + go_addr + " " + pin + " join"
568 if "OK" in self.global_request(cmd):
569 if timeout == 0:
570 self.dump_monitor()
571 return None
572 ev = self.wait_global_event(["P2P-GROUP-STARTED"], timeout)
573 if ev is None:
574 raise Exception("Joining the group timed out")
575 self.dump_monitor()
576 return self.group_form_result(ev)
577 raise Exception("P2P_CONNECT(join) failed")
578
579 def tdls_setup(self, peer):
580 cmd = "TDLS_SETUP " + peer
581 if "FAIL" in self.group_request(cmd):
582 raise Exception("Failed to request TDLS setup")
583 return None
584
585 def tdls_teardown(self, peer):
586 cmd = "TDLS_TEARDOWN " + peer
587 if "FAIL" in self.group_request(cmd):
588 raise Exception("Failed to request TDLS teardown")
589 return None
590
591 def connect(self, ssid=None, ssid2=None, psk=None, proto=None,
592 key_mgmt=None, wep_key0=None,
593 ieee80211w=None, pairwise=None, group=None, scan_freq=None,
594 eap=None, identity=None, anonymous_identity=None,
595 password=None, phase1=None, phase2=None, ca_cert=None,
596 domain_suffix_match=None, password_hex=None,
597 client_cert=None, private_key=None, peerkey=False, okc=False,
598 eapol_flags=None, fragment_size=None,
599 wait_connect=True, only_add_network=False,
600 ca_cert2=None, client_cert2=None, private_key2=None,
601 scan_ssid=None, raw_psk=None, pac_file=None,
602 subject_match=None, altsubject_match=None,
603 private_key_passwd=None, ocsp=None, auth_alg=None):
604 logger.info("Connect STA " + self.ifname + " to AP")
605 id = self.add_network()
606 if ssid:
607 self.set_network_quoted(id, "ssid", ssid)
608 elif ssid2:
609 self.set_network(id, "ssid", ssid2)
610 if psk:
611 self.set_network_quoted(id, "psk", psk)
612 if raw_psk:
613 self.set_network(id, "psk", raw_psk)
614 if proto:
615 self.set_network(id, "proto", proto)
616 if key_mgmt:
617 self.set_network(id, "key_mgmt", key_mgmt)
618 if ieee80211w:
619 self.set_network(id, "ieee80211w", ieee80211w)
620 if pairwise:
621 self.set_network(id, "pairwise", pairwise)
622 if group:
623 self.set_network(id, "group", group)
624 if wep_key0:
625 self.set_network(id, "wep_key0", wep_key0)
626 if scan_freq:
627 self.set_network(id, "scan_freq", scan_freq)
628 if eap:
629 self.set_network(id, "eap", eap)
630 if identity:
631 self.set_network_quoted(id, "identity", identity)
632 if anonymous_identity:
633 self.set_network_quoted(id, "anonymous_identity",
634 anonymous_identity)
635 if password:
636 self.set_network_quoted(id, "password", password)
637 if password_hex:
638 self.set_network(id, "password", password_hex)
639 if ca_cert:
640 self.set_network_quoted(id, "ca_cert", ca_cert)
641 if client_cert:
642 self.set_network_quoted(id, "client_cert", client_cert)
643 if private_key:
644 self.set_network_quoted(id, "private_key", private_key)
645 if private_key_passwd:
646 self.set_network_quoted(id, "private_key_passwd",
647 private_key_passwd)
648 if ca_cert2:
649 self.set_network_quoted(id, "ca_cert2", ca_cert2)
650 if client_cert2:
651 self.set_network_quoted(id, "client_cert2", client_cert2)
652 if private_key2:
653 self.set_network_quoted(id, "private_key2", private_key2)
654 if phase1:
655 self.set_network_quoted(id, "phase1", phase1)
656 if phase2:
657 self.set_network_quoted(id, "phase2", phase2)
658 if domain_suffix_match:
659 self.set_network_quoted(id, "domain_suffix_match",
660 domain_suffix_match)
661 if altsubject_match:
662 self.set_network_quoted(id, "altsubject_match",
663 altsubject_match)
664 if subject_match:
665 self.set_network_quoted(id, "subject_match",
666 subject_match)
667 if peerkey:
668 self.set_network(id, "peerkey", "1")
669 if okc:
670 self.set_network(id, "proactive_key_caching", "1")
671 if eapol_flags:
672 self.set_network(id, "eapol_flags", eapol_flags)
673 if fragment_size:
674 self.set_network(id, "fragment_size", fragment_size)
675 if scan_ssid:
676 self.set_network(id, "scan_ssid", scan_ssid)
677 if pac_file:
678 self.set_network_quoted(id, "pac_file", pac_file)
679 if ocsp:
680 self.set_network(id, "ocsp", str(ocsp))
681 if auth_alg:
682 self.set_network(id, "auth_alg", auth_alg)
683 if only_add_network:
684 return id
685 if wait_connect:
686 if eap:
687 self.connect_network(id, timeout=20)
688 else:
689 self.connect_network(id)
690 else:
691 self.dump_monitor()
692 self.select_network(id)
693 return id
694
695 def scan(self, type=None, freq=None, no_wait=False):
696 if type:
697 cmd = "SCAN TYPE=" + type
698 else:
699 cmd = "SCAN"
700 if freq:
701 cmd = cmd + " freq=" + freq
702 if not no_wait:
703 self.dump_monitor()
704 if not "OK" in self.request(cmd):
705 raise Exception("Failed to trigger scan")
706 if no_wait:
707 return
708 ev = self.wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
709 if ev is None:
710 raise Exception("Scan timed out")
711
712 def roam(self, bssid):
713 self.dump_monitor()
714 self.request("ROAM " + bssid)
715 ev = self.wait_event(["CTRL-EVENT-CONNECTED"], timeout=10)
716 if ev is None:
717 raise Exception("Roaming with the AP timed out")
718 self.dump_monitor()
719
720 def roam_over_ds(self, bssid):
721 self.dump_monitor()
722 self.request("FT_DS " + bssid)
723 ev = self.wait_event(["CTRL-EVENT-CONNECTED"], timeout=10)
724 if ev is None:
725 raise Exception("Roaming with the AP timed out")
726 self.dump_monitor()
727
728 def wps_reg(self, bssid, pin, new_ssid=None, key_mgmt=None, cipher=None,
729 new_passphrase=None, no_wait=False):
730 self.dump_monitor()
731 if new_ssid:
732 self.request("WPS_REG " + bssid + " " + pin + " " +
733 new_ssid.encode("hex") + " " + key_mgmt + " " +
734 cipher + " " + new_passphrase.encode("hex"))
735 if no_wait:
736 return
737 ev = self.wait_event(["WPS-SUCCESS"], timeout=15)
738 else:
739 self.request("WPS_REG " + bssid + " " + pin)
740 if no_wait:
741 return
742 ev = self.wait_event(["WPS-CRED-RECEIVED"], timeout=15)
743 if ev is None:
744 raise Exception("WPS cred timed out")
745 ev = self.wait_event(["WPS-FAIL"], timeout=15)
746 if ev is None:
747 raise Exception("WPS timed out")
748 ev = self.wait_event(["CTRL-EVENT-CONNECTED"], timeout=15)
749 if ev is None:
750 raise Exception("Association with the AP timed out")
751
752 def relog(self):
753 self.request("RELOG")
754
755 def wait_completed(self, timeout=10):
756 for i in range(0, timeout * 2):
757 if self.get_status_field("wpa_state") == "COMPLETED":
758 return
759 time.sleep(0.5)
760 raise Exception("Timeout while waiting for COMPLETED state")
761
762 def get_capability(self, field):
763 res = self.request("GET_CAPABILITY " + field)
764 if "FAIL" in res:
765 return None
766 return res.split(' ')
767
768 def get_bss(self, bssid):
769 res = self.request("BSS " + bssid)
770 lines = res.splitlines()
771 vals = dict()
772 for l in lines:
773 [name,value] = l.split('=', 1)
774 vals[name] = value
775 return vals
776
777 def get_pmksa(self, bssid):
778 res = self.request("PMKSA")
779 lines = res.splitlines()
780 for l in lines:
781 if bssid not in l:
782 continue
783 vals = dict()
784 [index,aa,pmkid,expiration,opportunistic] = l.split(' ')
785 vals['index'] = index
786 vals['pmkid'] = pmkid
787 vals['expiration'] = expiration
788 vals['opportunistic'] = opportunistic
789 return vals
790 return None
791
792 def get_sta(self, addr, info=None, next=False):
793 cmd = "STA-NEXT " if next else "STA "
794 if addr is None:
795 res = self.request("STA-FIRST")
796 elif info:
797 res = self.request(cmd + addr + " " + info)
798 else:
799 res = self.request(cmd + addr)
800 lines = res.splitlines()
801 vals = dict()
802 first = True
803 for l in lines:
804 if first:
805 vals['addr'] = l
806 first = False
807 else:
808 [name,value] = l.split('=', 1)
809 vals[name] = value
810 return vals