]> git.ipfire.org Git - thirdparty/hostap.git/blob - tests/hwsim/wpasupplicant.py
tests: HS 2.0 roaming partner preference
[thirdparty/hostap.git] / tests / hwsim / wpasupplicant.py
1 # Python class for controlling wpa_supplicant
2 # Copyright (c) 2013-2014, Jouni Malinen <j@w1.fi>
3 #
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
6
7 import os
8 import time
9 import logging
10 import re
11 import subprocess
12 import wpaspy
13
14 logger = logging.getLogger()
15 wpas_ctrl = '/var/run/wpa_supplicant'
16
17 class WpaSupplicant:
18 def __init__(self, ifname=None, global_iface=None):
19 self.group_ifname = None
20 if ifname:
21 self.set_ifname(ifname)
22 else:
23 self.ifname = None
24
25 self.global_iface = global_iface
26 if global_iface:
27 self.global_ctrl = wpaspy.Ctrl(global_iface)
28 self.global_mon = wpaspy.Ctrl(global_iface)
29 self.global_mon.attach()
30
31 def set_ifname(self, ifname):
32 self.ifname = ifname
33 self.ctrl = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname))
34 self.mon = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname))
35 self.mon.attach()
36
37 def remove_ifname(self):
38 if self.ifname:
39 self.mon.detach()
40 self.mon = None
41 self.ctrl = None
42 self.ifname = None
43
44 def interface_add(self, ifname, driver="nl80211", drv_params=None):
45 try:
46 groups = subprocess.check_output(["id"])
47 group = "admin" if "(admin)" in groups else "adm"
48 except Exception, e:
49 group = "admin"
50 cmd = "INTERFACE_ADD " + ifname + "\t\t" + driver + "\tDIR=/var/run/wpa_supplicant GROUP=" + group
51 if drv_params:
52 cmd = cmd + '\t' + drv_params
53 if "FAIL" in self.global_request(cmd):
54 raise Exception("Failed to add a dynamic wpa_supplicant interface")
55 self.set_ifname(ifname)
56
57 def interface_remove(self, ifname):
58 self.remove_ifname()
59 self.global_request("INTERFACE_REMOVE " + ifname)
60
61 def request(self, cmd):
62 logger.debug(self.ifname + ": CTRL: " + cmd)
63 return self.ctrl.request(cmd)
64
65 def global_request(self, cmd):
66 if self.global_iface is None:
67 self.request(cmd)
68 else:
69 ifname = self.ifname or self.global_iface
70 logger.debug(ifname + ": CTRL: " + cmd)
71 return self.global_ctrl.request(cmd)
72
73 def group_request(self, cmd):
74 if self.group_ifname and self.group_ifname != self.ifname:
75 logger.debug(self.group_ifname + ": CTRL: " + cmd)
76 gctrl = wpaspy.Ctrl(os.path.join(wpas_ctrl, self.group_ifname))
77 return gctrl.request(cmd)
78 return self.request(cmd)
79
80 def ping(self):
81 return "PONG" in self.request("PING")
82
83 def reset(self):
84 res = self.request("FLUSH")
85 if not "OK" in res:
86 logger.info("FLUSH to " + self.ifname + " failed: " + res)
87 self.request("WPS_ER_STOP")
88 self.request("SET pmf 0")
89 self.request("SET external_sim 0")
90 self.request("SET hessid 00:00:00:00:00:00")
91 self.request("SET access_network_type 15")
92 self.request("SET p2p_add_cli_chan 0")
93 self.request("SET p2p_no_go_freq ")
94 self.request("SET p2p_pref_chan ")
95 self.request("SET p2p_no_group_iface 1")
96 self.request("SET p2p_go_intent 7")
97 self.group_ifname = None
98 self.dump_monitor()
99
100 iter = 0
101 while iter < 60:
102 state = self.get_driver_status_field("scan_state")
103 if "SCAN_STARTED" in state or "SCAN_REQUESTED" in state:
104 logger.info(self.ifname + ": Waiting for scan operation to complete before continuing")
105 time.sleep(1)
106 else:
107 break
108 iter = iter + 1
109 if iter == 60:
110 logger.error(self.ifname + ": Driver scan state did not clear")
111 print "Trying to clear cfg80211/mac80211 scan state"
112 try:
113 cmd = ["sudo", "ifconfig", self.ifname, "down"]
114 subprocess.call(cmd)
115 except subprocess.CalledProcessError, e:
116 logger.info("ifconfig failed: " + str(e.returncode))
117 logger.info(e.output)
118 try:
119 cmd = ["sudo", "ifconfig", self.ifname, "up"]
120 subprocess.call(cmd)
121 except subprocess.CalledProcessError, e:
122 logger.info("ifconfig failed: " + str(e.returncode))
123 logger.info(e.output)
124 if iter > 0:
125 # The ongoing scan could have discovered BSSes or P2P peers
126 logger.info("Run FLUSH again since scan was in progress")
127 self.request("FLUSH")
128 self.dump_monitor()
129
130 if not self.ping():
131 logger.info("No PING response from " + self.ifname + " after reset")
132
133 def add_network(self):
134 id = self.request("ADD_NETWORK")
135 if "FAIL" in id:
136 raise Exception("ADD_NETWORK failed")
137 return int(id)
138
139 def remove_network(self, id):
140 id = self.request("REMOVE_NETWORK " + str(id))
141 if "FAIL" in id:
142 raise Exception("REMOVE_NETWORK failed")
143 return None
144
145 def get_network(self, id, field):
146 res = self.request("GET_NETWORK " + str(id) + " " + field)
147 if res == "FAIL\n":
148 return None
149 return res
150
151 def set_network(self, id, field, value):
152 res = self.request("SET_NETWORK " + str(id) + " " + field + " " + value)
153 if "FAIL" in res:
154 raise Exception("SET_NETWORK failed")
155 return None
156
157 def set_network_quoted(self, id, field, value):
158 res = self.request("SET_NETWORK " + str(id) + " " + field + ' "' + value + '"')
159 if "FAIL" in res:
160 raise Exception("SET_NETWORK failed")
161 return None
162
163 def list_networks(self):
164 res = self.request("LIST_NETWORKS")
165 lines = res.splitlines()
166 networks = []
167 for l in lines:
168 if "network id" in l:
169 continue
170 [id,ssid,bssid,flags] = l.split('\t')
171 network = {}
172 network['id'] = id
173 network['ssid'] = ssid
174 network['bssid'] = bssid
175 network['flags'] = flags
176 networks.append(network)
177 return networks
178
179 def hs20_enable(self):
180 self.request("SET interworking 1")
181 self.request("SET hs20 1")
182
183 def add_cred(self):
184 id = self.request("ADD_CRED")
185 if "FAIL" in id:
186 raise Exception("ADD_CRED failed")
187 return int(id)
188
189 def remove_cred(self, id):
190 id = self.request("REMOVE_CRED " + str(id))
191 if "FAIL" in id:
192 raise Exception("REMOVE_CRED failed")
193 return None
194
195 def set_cred(self, id, field, value):
196 res = self.request("SET_CRED " + str(id) + " " + field + " " + value)
197 if "FAIL" in res:
198 raise Exception("SET_CRED failed")
199 return None
200
201 def set_cred_quoted(self, id, field, value):
202 res = self.request("SET_CRED " + str(id) + " " + field + ' "' + value + '"')
203 if "FAIL" in res:
204 raise Exception("SET_CRED failed")
205 return None
206
207 def add_cred_values(self, params):
208 id = self.add_cred()
209
210 quoted = [ "realm", "username", "password", "domain", "imsi",
211 "excluded_ssid", "milenage", "ca_cert", "client_cert",
212 "private_key", "domain_suffix_match", "provisioning_sp",
213 "roaming_partner" ]
214 for field in quoted:
215 if field in params:
216 self.set_cred_quoted(id, field, params[field])
217
218 not_quoted = [ "eap", "roaming_consortium",
219 "required_roaming_consortium", "sp_priority" ]
220 for field in not_quoted:
221 if field in params:
222 self.set_cred(id, field, params[field])
223
224 return id;
225
226 def select_network(self, id):
227 id = self.request("SELECT_NETWORK " + str(id))
228 if "FAIL" in id:
229 raise Exception("SELECT_NETWORK failed")
230 return None
231
232 def connect_network(self, id, timeout=10):
233 self.dump_monitor()
234 self.select_network(id)
235 ev = self.wait_event(["CTRL-EVENT-CONNECTED"], timeout=timeout)
236 if ev is None:
237 raise Exception("Association with the AP timed out")
238 self.dump_monitor()
239
240 def get_status(self):
241 res = self.request("STATUS")
242 lines = res.splitlines()
243 vals = dict()
244 for l in lines:
245 try:
246 [name,value] = l.split('=', 1)
247 vals[name] = value
248 except ValueError, e:
249 logger.info(self.ifname + ": Ignore unexpected STATUS line: " + l)
250 return vals
251
252 def get_status_field(self, field):
253 vals = self.get_status()
254 if field in vals:
255 return vals[field]
256 return None
257
258 def get_group_status(self):
259 res = self.group_request("STATUS")
260 lines = res.splitlines()
261 vals = dict()
262 for l in lines:
263 [name,value] = l.split('=', 1)
264 vals[name] = value
265 return vals
266
267 def get_group_status_field(self, field):
268 vals = self.get_group_status()
269 if field in vals:
270 return vals[field]
271 return None
272
273 def get_driver_status(self):
274 res = self.request("STATUS-DRIVER")
275 lines = res.splitlines()
276 vals = dict()
277 for l in lines:
278 [name,value] = l.split('=', 1)
279 vals[name] = value
280 return vals
281
282 def get_driver_status_field(self, field):
283 vals = self.get_driver_status()
284 if field in vals:
285 return vals[field]
286 return None
287
288 def p2p_dev_addr(self):
289 return self.get_status_field("p2p_device_address")
290
291 def p2p_interface_addr(self):
292 return self.get_group_status_field("address")
293
294 def p2p_listen(self):
295 return self.global_request("P2P_LISTEN")
296
297 def p2p_find(self, social=False, dev_id=None, dev_type=None):
298 cmd = "P2P_FIND"
299 if social:
300 cmd = cmd + " type=social"
301 if dev_id:
302 cmd = cmd + " dev_id=" + dev_id
303 if dev_type:
304 cmd = cmd + " dev_type=" + dev_type
305 return self.global_request(cmd)
306
307 def p2p_stop_find(self):
308 return self.global_request("P2P_STOP_FIND")
309
310 def wps_read_pin(self):
311 #TODO: make this random
312 self.pin = "12345670"
313 return self.pin
314
315 def peer_known(self, peer, full=True):
316 res = self.global_request("P2P_PEER " + peer)
317 if peer.lower() not in res.lower():
318 return False
319 if not full:
320 return True
321 return "[PROBE_REQ_ONLY]" not in res
322
323 def discover_peer(self, peer, full=True, timeout=15, social=True, force_find=False):
324 logger.info(self.ifname + ": Trying to discover peer " + peer)
325 if not force_find and self.peer_known(peer, full):
326 return True
327 self.p2p_find(social)
328 count = 0
329 while count < timeout:
330 time.sleep(1)
331 count = count + 1
332 if self.peer_known(peer, full):
333 return True
334 return False
335
336 def get_peer(self, peer):
337 res = self.global_request("P2P_PEER " + peer)
338 if peer.lower() not in res.lower():
339 raise Exception("Peer information not available")
340 lines = res.splitlines()
341 vals = dict()
342 for l in lines:
343 if '=' in l:
344 [name,value] = l.split('=', 1)
345 vals[name] = value
346 return vals
347
348 def group_form_result(self, ev, expect_failure=False, go_neg_res=None):
349 if expect_failure:
350 if "P2P-GROUP-STARTED" in ev:
351 raise Exception("Group formation succeeded when expecting failure")
352 exp = r'<.>(P2P-GO-NEG-FAILURE) status=([0-9]*)'
353 s = re.split(exp, ev)
354 if len(s) < 3:
355 return None
356 res = {}
357 res['result'] = 'go-neg-failed'
358 res['status'] = int(s[2])
359 return res
360
361 if "P2P-GROUP-STARTED" not in ev:
362 raise Exception("No P2P-GROUP-STARTED event seen")
363
364 exp = r'<.>(P2P-GROUP-STARTED) ([^ ]*) ([^ ]*) ssid="(.*)" freq=([0-9]*) ((?:psk=.*)|(?:passphrase=".*")) go_dev_addr=([0-9a-f:]*) ip_addr=([0-9.]*) ip_mask=([0-9.]*) go_ip_addr=([0-9.]*)'
365 s = re.split(exp, ev)
366 if len(s) < 11:
367 exp = r'<.>(P2P-GROUP-STARTED) ([^ ]*) ([^ ]*) ssid="(.*)" freq=([0-9]*) ((?:psk=.*)|(?:passphrase=".*")) go_dev_addr=([0-9a-f:]*)'
368 s = re.split(exp, ev)
369 if len(s) < 8:
370 raise Exception("Could not parse P2P-GROUP-STARTED")
371 res = {}
372 res['result'] = 'success'
373 res['ifname'] = s[2]
374 self.group_ifname = s[2]
375 res['role'] = s[3]
376 res['ssid'] = s[4]
377 res['freq'] = s[5]
378 if "[PERSISTENT]" in ev:
379 res['persistent'] = True
380 else:
381 res['persistent'] = False
382 p = re.match(r'psk=([0-9a-f]*)', s[6])
383 if p:
384 res['psk'] = p.group(1)
385 p = re.match(r'passphrase="(.*)"', s[6])
386 if p:
387 res['passphrase'] = p.group(1)
388 res['go_dev_addr'] = s[7]
389
390 if len(s) > 8 and len(s[8]) > 0:
391 res['ip_addr'] = s[8]
392 if len(s) > 9:
393 res['ip_mask'] = s[9]
394 if len(s) > 10:
395 res['go_ip_addr'] = s[10]
396
397 if go_neg_res:
398 exp = r'<.>(P2P-GO-NEG-SUCCESS) role=(GO|client) freq=([0-9]*)'
399 s = re.split(exp, go_neg_res)
400 if len(s) < 4:
401 raise Exception("Could not parse P2P-GO-NEG-SUCCESS")
402 res['go_neg_role'] = s[2]
403 res['go_neg_freq'] = s[3]
404
405 return res
406
407 def p2p_go_neg_auth(self, peer, pin, method, go_intent=None, persistent=False, freq=None):
408 if not self.discover_peer(peer):
409 raise Exception("Peer " + peer + " not found")
410 self.dump_monitor()
411 cmd = "P2P_CONNECT " + peer + " " + pin + " " + method + " auth"
412 if go_intent:
413 cmd = cmd + ' go_intent=' + str(go_intent)
414 if freq:
415 cmd = cmd + ' freq=' + str(freq)
416 if persistent:
417 cmd = cmd + " persistent"
418 if "OK" in self.global_request(cmd):
419 return None
420 raise Exception("P2P_CONNECT (auth) failed")
421
422 def p2p_go_neg_auth_result(self, timeout=1, expect_failure=False):
423 go_neg_res = None
424 ev = self.wait_global_event(["P2P-GO-NEG-SUCCESS",
425 "P2P-GO-NEG-FAILURE"], timeout);
426 if ev is None:
427 if expect_failure:
428 return None
429 raise Exception("Group formation timed out")
430 if "P2P-GO-NEG-SUCCESS" in ev:
431 go_neg_res = ev
432 ev = self.wait_global_event(["P2P-GROUP-STARTED"], timeout);
433 if ev is None:
434 if expect_failure:
435 return None
436 raise Exception("Group formation timed out")
437 self.dump_monitor()
438 return self.group_form_result(ev, expect_failure, go_neg_res)
439
440 def p2p_go_neg_init(self, peer, pin, method, timeout=0, go_intent=None, expect_failure=False, persistent=False, freq=None):
441 if not self.discover_peer(peer):
442 raise Exception("Peer " + peer + " not found")
443 self.dump_monitor()
444 if pin:
445 cmd = "P2P_CONNECT " + peer + " " + pin + " " + method
446 else:
447 cmd = "P2P_CONNECT " + peer + " " + method
448 if go_intent:
449 cmd = cmd + ' go_intent=' + str(go_intent)
450 if freq:
451 cmd = cmd + ' freq=' + str(freq)
452 if persistent:
453 cmd = cmd + " persistent"
454 if "OK" in self.global_request(cmd):
455 if timeout == 0:
456 self.dump_monitor()
457 return None
458 go_neg_res = None
459 ev = self.wait_global_event(["P2P-GO-NEG-SUCCESS",
460 "P2P-GO-NEG-FAILURE"], timeout)
461 if ev is None:
462 if expect_failure:
463 return None
464 raise Exception("Group formation timed out")
465 if "P2P-GO-NEG-SUCCESS" in ev:
466 go_neg_res = ev
467 ev = self.wait_global_event(["P2P-GROUP-STARTED"], timeout)
468 if ev is None:
469 if expect_failure:
470 return None
471 raise Exception("Group formation timed out")
472 self.dump_monitor()
473 return self.group_form_result(ev, expect_failure, go_neg_res)
474 raise Exception("P2P_CONNECT failed")
475
476 def wait_event(self, events, timeout=10):
477 start = os.times()[4]
478 while True:
479 while self.mon.pending():
480 ev = self.mon.recv()
481 logger.debug(self.ifname + ": " + ev)
482 for event in events:
483 if event in ev:
484 return ev
485 now = os.times()[4]
486 remaining = start + timeout - now
487 if remaining <= 0:
488 break
489 if not self.mon.pending(timeout=remaining):
490 break
491 return None
492
493 def wait_global_event(self, events, timeout):
494 if self.global_iface is None:
495 self.wait_event(events, timeout)
496 else:
497 start = os.times()[4]
498 while True:
499 while self.global_mon.pending():
500 ev = self.global_mon.recv()
501 logger.debug(self.ifname + "(global): " + ev)
502 for event in events:
503 if event in ev:
504 return ev
505 now = os.times()[4]
506 remaining = start + timeout - now
507 if remaining <= 0:
508 break
509 if not self.global_mon.pending(timeout=remaining):
510 break
511 return None
512
513 def wait_go_ending_session(self):
514 ev = self.wait_event(["P2P-GROUP-REMOVED"], timeout=3)
515 if ev is None:
516 raise Exception("Group removal event timed out")
517 if "reason=GO_ENDING_SESSION" not in ev:
518 raise Exception("Unexpected group removal reason")
519
520 def dump_monitor(self):
521 while self.mon.pending():
522 ev = self.mon.recv()
523 logger.debug(self.ifname + ": " + ev)
524 while self.global_mon.pending():
525 ev = self.global_mon.recv()
526 logger.debug(self.ifname + "(global): " + ev)
527
528 def remove_group(self, ifname=None):
529 if ifname is None:
530 ifname = self.group_ifname if self.group_ifname else self.ifname
531 if "OK" not in self.global_request("P2P_GROUP_REMOVE " + ifname):
532 raise Exception("Group could not be removed")
533 self.group_ifname = None
534
535 def p2p_start_go(self, persistent=None, freq=None):
536 self.dump_monitor()
537 cmd = "P2P_GROUP_ADD"
538 if persistent is None:
539 pass
540 elif persistent is True:
541 cmd = cmd + " persistent"
542 else:
543 cmd = cmd + " persistent=" + str(persistent)
544 if freq:
545 cmd = cmd + " freq=" + str(freq)
546 if "OK" in self.global_request(cmd):
547 ev = self.wait_global_event(["P2P-GROUP-STARTED"], timeout=5)
548 if ev is None:
549 raise Exception("GO start up timed out")
550 self.dump_monitor()
551 return self.group_form_result(ev)
552 raise Exception("P2P_GROUP_ADD failed")
553
554 def p2p_go_authorize_client(self, pin):
555 cmd = "WPS_PIN any " + pin
556 if "FAIL" in self.group_request(cmd):
557 raise Exception("Failed to authorize client connection on GO")
558 return None
559
560 def p2p_go_authorize_client_pbc(self):
561 cmd = "WPS_PBC"
562 if "FAIL" in self.group_request(cmd):
563 raise Exception("Failed to authorize client connection on GO")
564 return None
565
566 def p2p_connect_group(self, go_addr, pin, timeout=0, social=False):
567 self.dump_monitor()
568 if not self.discover_peer(go_addr, social=social):
569 raise Exception("GO " + go_addr + " not found")
570 self.dump_monitor()
571 cmd = "P2P_CONNECT " + go_addr + " " + pin + " join"
572 if "OK" in self.global_request(cmd):
573 if timeout == 0:
574 self.dump_monitor()
575 return None
576 ev = self.wait_global_event(["P2P-GROUP-STARTED"], timeout)
577 if ev is None:
578 raise Exception("Joining the group timed out")
579 self.dump_monitor()
580 return self.group_form_result(ev)
581 raise Exception("P2P_CONNECT(join) failed")
582
583 def tdls_setup(self, peer):
584 cmd = "TDLS_SETUP " + peer
585 if "FAIL" in self.group_request(cmd):
586 raise Exception("Failed to request TDLS setup")
587 return None
588
589 def tdls_teardown(self, peer):
590 cmd = "TDLS_TEARDOWN " + peer
591 if "FAIL" in self.group_request(cmd):
592 raise Exception("Failed to request TDLS teardown")
593 return None
594
595 def connect(self, ssid=None, ssid2=None, **kwargs):
596 logger.info("Connect STA " + self.ifname + " to AP")
597 id = self.add_network()
598 if ssid:
599 self.set_network_quoted(id, "ssid", ssid)
600 elif ssid2:
601 self.set_network(id, "ssid", ssid2)
602
603 quoted = [ "psk", "identity", "anonymous_identity", "password",
604 "ca_cert", "client_cert", "private_key",
605 "private_key_passwd", "ca_cert2", "client_cert2",
606 "private_key2", "phase1", "phase2", "domain_suffix_match",
607 "altsubject_match", "subject_match", "pac_file", "dh_file" ]
608 for field in quoted:
609 if field in kwargs and kwargs[field]:
610 self.set_network_quoted(id, field, kwargs[field])
611
612 not_quoted = [ "proto", "key_mgmt", "ieee80211w", "pairwise",
613 "group", "wep_key0", "scan_freq", "eap",
614 "eapol_flags", "fragment_size", "scan_ssid", "auth_alg" ]
615 for field in not_quoted:
616 if field in kwargs and kwargs[field]:
617 self.set_network(id, field, kwargs[field])
618
619 if "raw_psk" in kwargs and kwargs['raw_psk']:
620 self.set_network(id, "psk", kwargs['raw_psk'])
621 if "password_hex" in kwargs and kwargs['password_hex']:
622 self.set_network(id, "password", kwargs['password_hex'])
623 if "peerkey" in kwargs and kwargs['peerkey']:
624 self.set_network(id, "peerkey", "1")
625 if "okc" in kwargs and kwargs['okc']:
626 self.set_network(id, "proactive_key_caching", "1")
627 if "ocsp" in kwargs and kwargs['ocsp']:
628 self.set_network(id, "ocsp", str(kwargs['ocsp']))
629 if "only_add_network" in kwargs and kwargs['only_add_network']:
630 return id
631 if "wait_connect" not in kwargs or kwargs['wait_connect']:
632 if "eap" in kwargs:
633 self.connect_network(id, timeout=20)
634 else:
635 self.connect_network(id)
636 else:
637 self.dump_monitor()
638 self.select_network(id)
639 return id
640
641 def scan(self, type=None, freq=None, no_wait=False):
642 if type:
643 cmd = "SCAN TYPE=" + type
644 else:
645 cmd = "SCAN"
646 if freq:
647 cmd = cmd + " freq=" + freq
648 if not no_wait:
649 self.dump_monitor()
650 if not "OK" in self.request(cmd):
651 raise Exception("Failed to trigger scan")
652 if no_wait:
653 return
654 ev = self.wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
655 if ev is None:
656 raise Exception("Scan timed out")
657
658 def roam(self, bssid):
659 self.dump_monitor()
660 self.request("ROAM " + bssid)
661 ev = self.wait_event(["CTRL-EVENT-CONNECTED"], timeout=10)
662 if ev is None:
663 raise Exception("Roaming with the AP timed out")
664 self.dump_monitor()
665
666 def roam_over_ds(self, bssid):
667 self.dump_monitor()
668 self.request("FT_DS " + bssid)
669 ev = self.wait_event(["CTRL-EVENT-CONNECTED"], timeout=10)
670 if ev is None:
671 raise Exception("Roaming with the AP timed out")
672 self.dump_monitor()
673
674 def wps_reg(self, bssid, pin, new_ssid=None, key_mgmt=None, cipher=None,
675 new_passphrase=None, no_wait=False):
676 self.dump_monitor()
677 if new_ssid:
678 self.request("WPS_REG " + bssid + " " + pin + " " +
679 new_ssid.encode("hex") + " " + key_mgmt + " " +
680 cipher + " " + new_passphrase.encode("hex"))
681 if no_wait:
682 return
683 ev = self.wait_event(["WPS-SUCCESS"], timeout=15)
684 else:
685 self.request("WPS_REG " + bssid + " " + pin)
686 if no_wait:
687 return
688 ev = self.wait_event(["WPS-CRED-RECEIVED"], timeout=15)
689 if ev is None:
690 raise Exception("WPS cred timed out")
691 ev = self.wait_event(["WPS-FAIL"], timeout=15)
692 if ev is None:
693 raise Exception("WPS timed out")
694 ev = self.wait_event(["CTRL-EVENT-CONNECTED"], timeout=15)
695 if ev is None:
696 raise Exception("Association with the AP timed out")
697
698 def relog(self):
699 self.request("RELOG")
700
701 def wait_completed(self, timeout=10):
702 for i in range(0, timeout * 2):
703 if self.get_status_field("wpa_state") == "COMPLETED":
704 return
705 time.sleep(0.5)
706 raise Exception("Timeout while waiting for COMPLETED state")
707
708 def get_capability(self, field):
709 res = self.request("GET_CAPABILITY " + field)
710 if "FAIL" in res:
711 return None
712 return res.split(' ')
713
714 def get_bss(self, bssid):
715 res = self.request("BSS " + bssid)
716 lines = res.splitlines()
717 vals = dict()
718 for l in lines:
719 [name,value] = l.split('=', 1)
720 vals[name] = value
721 return vals
722
723 def get_pmksa(self, bssid):
724 res = self.request("PMKSA")
725 lines = res.splitlines()
726 for l in lines:
727 if bssid not in l:
728 continue
729 vals = dict()
730 [index,aa,pmkid,expiration,opportunistic] = l.split(' ')
731 vals['index'] = index
732 vals['pmkid'] = pmkid
733 vals['expiration'] = expiration
734 vals['opportunistic'] = opportunistic
735 return vals
736 return None
737
738 def get_sta(self, addr, info=None, next=False):
739 cmd = "STA-NEXT " if next else "STA "
740 if addr is None:
741 res = self.request("STA-FIRST")
742 elif info:
743 res = self.request(cmd + addr + " " + info)
744 else:
745 res = self.request(cmd + addr)
746 lines = res.splitlines()
747 vals = dict()
748 first = True
749 for l in lines:
750 if first:
751 vals['addr'] = l
752 first = False
753 else:
754 [name,value] = l.split('=', 1)
755 vals[name] = value
756 return vals