]> git.ipfire.org Git - thirdparty/hostap.git/blob - tests/hwsim/wpasupplicant.py
tests: Stop WPS ER on station reset
[thirdparty/hostap.git] / tests / hwsim / wpasupplicant.py
1 # Python class for controlling wpa_supplicant
2 # Copyright (c) 2013-2014, Jouni Malinen <j@w1.fi>
3 #
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
6
7 import os
8 import time
9 import logging
10 import re
11 import subprocess
12 import wpaspy
13
14 logger = logging.getLogger()
15 wpas_ctrl = '/var/run/wpa_supplicant'
16
17 class WpaSupplicant:
18 def __init__(self, ifname=None, global_iface=None):
19 self.group_ifname = None
20 if ifname:
21 self.set_ifname(ifname)
22 else:
23 self.ifname = None
24
25 self.global_iface = global_iface
26 if global_iface:
27 self.global_ctrl = wpaspy.Ctrl(global_iface)
28 self.global_mon = wpaspy.Ctrl(global_iface)
29 self.global_mon.attach()
30
31 def set_ifname(self, ifname):
32 self.ifname = ifname
33 self.ctrl = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname))
34 self.mon = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname))
35 self.mon.attach()
36
37 def remove_ifname(self):
38 if self.ifname:
39 self.mon.detach()
40 self.mon = None
41 self.ctrl = None
42 self.ifname = None
43
44 def interface_add(self, ifname, driver="nl80211", drv_params=None):
45 try:
46 groups = subprocess.check_output(["id"])
47 group = "admin" if "(admin)" in groups else "adm"
48 except Exception, e:
49 group = "admin"
50 cmd = "INTERFACE_ADD " + ifname + "\t\t" + driver + "\tDIR=/var/run/wpa_supplicant GROUP=" + group
51 if drv_params:
52 cmd = cmd + '\t' + drv_params
53 if "FAIL" in self.global_request(cmd):
54 raise Exception("Failed to add a dynamic wpa_supplicant interface")
55 self.set_ifname(ifname)
56
57 def interface_remove(self, ifname):
58 self.remove_ifname()
59 self.global_request("INTERFACE_REMOVE " + ifname)
60
61 def request(self, cmd):
62 logger.debug(self.ifname + ": CTRL: " + cmd)
63 return self.ctrl.request(cmd)
64
65 def global_request(self, cmd):
66 if self.global_iface is None:
67 self.request(cmd)
68 else:
69 ifname = self.ifname or self.global_iface
70 logger.debug(ifname + ": CTRL: " + cmd)
71 return self.global_ctrl.request(cmd)
72
73 def group_request(self, cmd):
74 if self.group_ifname and self.group_ifname != self.ifname:
75 logger.debug(self.group_ifname + ": CTRL: " + cmd)
76 gctrl = wpaspy.Ctrl(os.path.join(wpas_ctrl, self.group_ifname))
77 return gctrl.request(cmd)
78 return self.request(cmd)
79
80 def ping(self):
81 return "PONG" in self.request("PING")
82
83 def reset(self):
84 res = self.request("FLUSH")
85 if not "OK" in res:
86 logger.info("FLUSH to " + self.ifname + " failed: " + res)
87 self.request("WPS_ER_STOP")
88 self.request("SET external_sim 0")
89 self.request("SET hessid 00:00:00:00:00:00")
90 self.request("SET access_network_type 15")
91 self.request("SET p2p_add_cli_chan 0")
92 self.request("SET p2p_no_go_freq ")
93 self.request("SET p2p_pref_chan ")
94 self.request("SET p2p_no_group_iface 1")
95 self.request("SET p2p_go_intent 7")
96 self.group_ifname = None
97 self.dump_monitor()
98
99 iter = 0
100 while iter < 60:
101 state = self.get_driver_status_field("scan_state")
102 if "SCAN_STARTED" in state or "SCAN_REQUESTED" in state:
103 logger.info(self.ifname + ": Waiting for scan operation to complete before continuing")
104 time.sleep(1)
105 else:
106 break
107 iter = iter + 1
108 if iter == 60:
109 logger.error(self.ifname + ": Driver scan state did not clear")
110 print "Trying to clear cfg80211/mac80211 scan state"
111 try:
112 cmd = ["sudo", "ifconfig", self.ifname, "down"]
113 subprocess.call(cmd)
114 except subprocess.CalledProcessError, e:
115 logger.info("ifconfig failed: " + str(e.returncode))
116 logger.info(e.output)
117 try:
118 cmd = ["sudo", "ifconfig", self.ifname, "up"]
119 subprocess.call(cmd)
120 except subprocess.CalledProcessError, e:
121 logger.info("ifconfig failed: " + str(e.returncode))
122 logger.info(e.output)
123 if iter > 0:
124 # The ongoing scan could have discovered BSSes or P2P peers
125 logger.info("Run FLUSH again since scan was in progress")
126 self.request("FLUSH")
127 self.dump_monitor()
128
129 if not self.ping():
130 logger.info("No PING response from " + self.ifname + " after reset")
131
132 def add_network(self):
133 id = self.request("ADD_NETWORK")
134 if "FAIL" in id:
135 raise Exception("ADD_NETWORK failed")
136 return int(id)
137
138 def remove_network(self, id):
139 id = self.request("REMOVE_NETWORK " + str(id))
140 if "FAIL" in id:
141 raise Exception("REMOVE_NETWORK failed")
142 return None
143
144 def get_network(self, id, field):
145 res = self.request("GET_NETWORK " + str(id) + " " + field)
146 if res == "FAIL\n":
147 return None
148 return res
149
150 def set_network(self, id, field, value):
151 res = self.request("SET_NETWORK " + str(id) + " " + field + " " + value)
152 if "FAIL" in res:
153 raise Exception("SET_NETWORK failed")
154 return None
155
156 def set_network_quoted(self, id, field, value):
157 res = self.request("SET_NETWORK " + str(id) + " " + field + ' "' + value + '"')
158 if "FAIL" in res:
159 raise Exception("SET_NETWORK failed")
160 return None
161
162 def list_networks(self):
163 res = self.request("LIST_NETWORKS")
164 lines = res.splitlines()
165 networks = []
166 for l in lines:
167 if "network id" in l:
168 continue
169 [id,ssid,bssid,flags] = l.split('\t')
170 network = {}
171 network['id'] = id
172 network['ssid'] = ssid
173 network['bssid'] = bssid
174 network['flags'] = flags
175 networks.append(network)
176 return networks
177
178 def hs20_enable(self):
179 self.request("SET interworking 1")
180 self.request("SET hs20 1")
181
182 def add_cred(self):
183 id = self.request("ADD_CRED")
184 if "FAIL" in id:
185 raise Exception("ADD_CRED failed")
186 return int(id)
187
188 def remove_cred(self, id):
189 id = self.request("REMOVE_CRED " + str(id))
190 if "FAIL" in id:
191 raise Exception("REMOVE_CRED failed")
192 return None
193
194 def set_cred(self, id, field, value):
195 res = self.request("SET_CRED " + str(id) + " " + field + " " + value)
196 if "FAIL" in res:
197 raise Exception("SET_CRED failed")
198 return None
199
200 def set_cred_quoted(self, id, field, value):
201 res = self.request("SET_CRED " + str(id) + " " + field + ' "' + value + '"')
202 if "FAIL" in res:
203 raise Exception("SET_CRED failed")
204 return None
205
206 def add_cred_values(self, params):
207 id = self.add_cred()
208
209 quoted = [ "realm", "username", "password", "domain", "imsi",
210 "excluded_ssid", "milenage", "ca_cert", "client_cert",
211 "private_key" ]
212 for field in quoted:
213 if field in params:
214 self.set_cred_quoted(id, field, params[field])
215
216 not_quoted = [ "eap", "roaming_consortium",
217 "required_roaming_consortium" ]
218 for field in not_quoted:
219 if field in params:
220 self.set_cred(id, field, params[field])
221
222 return id;
223
224 def select_network(self, id):
225 id = self.request("SELECT_NETWORK " + str(id))
226 if "FAIL" in id:
227 raise Exception("SELECT_NETWORK failed")
228 return None
229
230 def connect_network(self, id, timeout=10):
231 self.dump_monitor()
232 self.select_network(id)
233 ev = self.wait_event(["CTRL-EVENT-CONNECTED"], timeout=timeout)
234 if ev is None:
235 raise Exception("Association with the AP timed out")
236 self.dump_monitor()
237
238 def get_status(self):
239 res = self.request("STATUS")
240 lines = res.splitlines()
241 vals = dict()
242 for l in lines:
243 try:
244 [name,value] = l.split('=', 1)
245 vals[name] = value
246 except ValueError, e:
247 logger.info(self.ifname + ": Ignore unexpected STATUS line: " + l)
248 return vals
249
250 def get_status_field(self, field):
251 vals = self.get_status()
252 if field in vals:
253 return vals[field]
254 return None
255
256 def get_group_status(self):
257 res = self.group_request("STATUS")
258 lines = res.splitlines()
259 vals = dict()
260 for l in lines:
261 [name,value] = l.split('=', 1)
262 vals[name] = value
263 return vals
264
265 def get_group_status_field(self, field):
266 vals = self.get_group_status()
267 if field in vals:
268 return vals[field]
269 return None
270
271 def get_driver_status(self):
272 res = self.request("STATUS-DRIVER")
273 lines = res.splitlines()
274 vals = dict()
275 for l in lines:
276 [name,value] = l.split('=', 1)
277 vals[name] = value
278 return vals
279
280 def get_driver_status_field(self, field):
281 vals = self.get_driver_status()
282 if field in vals:
283 return vals[field]
284 return None
285
286 def p2p_dev_addr(self):
287 return self.get_status_field("p2p_device_address")
288
289 def p2p_interface_addr(self):
290 return self.get_group_status_field("address")
291
292 def p2p_listen(self):
293 return self.global_request("P2P_LISTEN")
294
295 def p2p_find(self, social=False, dev_id=None, dev_type=None):
296 cmd = "P2P_FIND"
297 if social:
298 cmd = cmd + " type=social"
299 if dev_id:
300 cmd = cmd + " dev_id=" + dev_id
301 if dev_type:
302 cmd = cmd + " dev_type=" + dev_type
303 return self.global_request(cmd)
304
305 def p2p_stop_find(self):
306 return self.global_request("P2P_STOP_FIND")
307
308 def wps_read_pin(self):
309 #TODO: make this random
310 self.pin = "12345670"
311 return self.pin
312
313 def peer_known(self, peer, full=True):
314 res = self.global_request("P2P_PEER " + peer)
315 if peer.lower() not in res.lower():
316 return False
317 if not full:
318 return True
319 return "[PROBE_REQ_ONLY]" not in res
320
321 def discover_peer(self, peer, full=True, timeout=15, social=True, force_find=False):
322 logger.info(self.ifname + ": Trying to discover peer " + peer)
323 if not force_find and self.peer_known(peer, full):
324 return True
325 self.p2p_find(social)
326 count = 0
327 while count < timeout:
328 time.sleep(1)
329 count = count + 1
330 if self.peer_known(peer, full):
331 return True
332 return False
333
334 def get_peer(self, peer):
335 res = self.global_request("P2P_PEER " + peer)
336 if peer.lower() not in res.lower():
337 raise Exception("Peer information not available")
338 lines = res.splitlines()
339 vals = dict()
340 for l in lines:
341 if '=' in l:
342 [name,value] = l.split('=', 1)
343 vals[name] = value
344 return vals
345
346 def group_form_result(self, ev, expect_failure=False, go_neg_res=None):
347 if expect_failure:
348 if "P2P-GROUP-STARTED" in ev:
349 raise Exception("Group formation succeeded when expecting failure")
350 exp = r'<.>(P2P-GO-NEG-FAILURE) status=([0-9]*)'
351 s = re.split(exp, ev)
352 if len(s) < 3:
353 return None
354 res = {}
355 res['result'] = 'go-neg-failed'
356 res['status'] = int(s[2])
357 return res
358
359 if "P2P-GROUP-STARTED" not in ev:
360 raise Exception("No P2P-GROUP-STARTED event seen")
361
362 exp = r'<.>(P2P-GROUP-STARTED) ([^ ]*) ([^ ]*) ssid="(.*)" freq=([0-9]*) ((?:psk=.*)|(?:passphrase=".*")) go_dev_addr=([0-9a-f:]*) ip_addr=([0-9.]*) ip_mask=([0-9.]*) go_ip_addr=([0-9.]*)'
363 s = re.split(exp, ev)
364 if len(s) < 11:
365 exp = r'<.>(P2P-GROUP-STARTED) ([^ ]*) ([^ ]*) ssid="(.*)" freq=([0-9]*) ((?:psk=.*)|(?:passphrase=".*")) go_dev_addr=([0-9a-f:]*)'
366 s = re.split(exp, ev)
367 if len(s) < 8:
368 raise Exception("Could not parse P2P-GROUP-STARTED")
369 res = {}
370 res['result'] = 'success'
371 res['ifname'] = s[2]
372 self.group_ifname = s[2]
373 res['role'] = s[3]
374 res['ssid'] = s[4]
375 res['freq'] = s[5]
376 if "[PERSISTENT]" in ev:
377 res['persistent'] = True
378 else:
379 res['persistent'] = False
380 p = re.match(r'psk=([0-9a-f]*)', s[6])
381 if p:
382 res['psk'] = p.group(1)
383 p = re.match(r'passphrase="(.*)"', s[6])
384 if p:
385 res['passphrase'] = p.group(1)
386 res['go_dev_addr'] = s[7]
387
388 if len(s) > 8 and len(s[8]) > 0:
389 res['ip_addr'] = s[8]
390 if len(s) > 9:
391 res['ip_mask'] = s[9]
392 if len(s) > 10:
393 res['go_ip_addr'] = s[10]
394
395 if go_neg_res:
396 exp = r'<.>(P2P-GO-NEG-SUCCESS) role=(GO|client) freq=([0-9]*)'
397 s = re.split(exp, go_neg_res)
398 if len(s) < 4:
399 raise Exception("Could not parse P2P-GO-NEG-SUCCESS")
400 res['go_neg_role'] = s[2]
401 res['go_neg_freq'] = s[3]
402
403 return res
404
405 def p2p_go_neg_auth(self, peer, pin, method, go_intent=None, persistent=False, freq=None):
406 if not self.discover_peer(peer):
407 raise Exception("Peer " + peer + " not found")
408 self.dump_monitor()
409 cmd = "P2P_CONNECT " + peer + " " + pin + " " + method + " auth"
410 if go_intent:
411 cmd = cmd + ' go_intent=' + str(go_intent)
412 if freq:
413 cmd = cmd + ' freq=' + str(freq)
414 if persistent:
415 cmd = cmd + " persistent"
416 if "OK" in self.global_request(cmd):
417 return None
418 raise Exception("P2P_CONNECT (auth) failed")
419
420 def p2p_go_neg_auth_result(self, timeout=1, expect_failure=False):
421 go_neg_res = None
422 ev = self.wait_global_event(["P2P-GO-NEG-SUCCESS",
423 "P2P-GO-NEG-FAILURE"], timeout);
424 if ev is None:
425 if expect_failure:
426 return None
427 raise Exception("Group formation timed out")
428 if "P2P-GO-NEG-SUCCESS" in ev:
429 go_neg_res = ev
430 ev = self.wait_global_event(["P2P-GROUP-STARTED"], timeout);
431 if ev is None:
432 if expect_failure:
433 return None
434 raise Exception("Group formation timed out")
435 self.dump_monitor()
436 return self.group_form_result(ev, expect_failure, go_neg_res)
437
438 def p2p_go_neg_init(self, peer, pin, method, timeout=0, go_intent=None, expect_failure=False, persistent=False, freq=None):
439 if not self.discover_peer(peer):
440 raise Exception("Peer " + peer + " not found")
441 self.dump_monitor()
442 if pin:
443 cmd = "P2P_CONNECT " + peer + " " + pin + " " + method
444 else:
445 cmd = "P2P_CONNECT " + peer + " " + method
446 if go_intent:
447 cmd = cmd + ' go_intent=' + str(go_intent)
448 if freq:
449 cmd = cmd + ' freq=' + str(freq)
450 if persistent:
451 cmd = cmd + " persistent"
452 if "OK" in self.global_request(cmd):
453 if timeout == 0:
454 self.dump_monitor()
455 return None
456 go_neg_res = None
457 ev = self.wait_global_event(["P2P-GO-NEG-SUCCESS",
458 "P2P-GO-NEG-FAILURE"], timeout)
459 if ev is None:
460 if expect_failure:
461 return None
462 raise Exception("Group formation timed out")
463 if "P2P-GO-NEG-SUCCESS" in ev:
464 go_neg_res = ev
465 ev = self.wait_global_event(["P2P-GROUP-STARTED"], timeout)
466 if ev is None:
467 if expect_failure:
468 return None
469 raise Exception("Group formation timed out")
470 self.dump_monitor()
471 return self.group_form_result(ev, expect_failure, go_neg_res)
472 raise Exception("P2P_CONNECT failed")
473
474 def wait_event(self, events, timeout=10):
475 start = os.times()[4]
476 while True:
477 while self.mon.pending():
478 ev = self.mon.recv()
479 logger.debug(self.ifname + ": " + ev)
480 for event in events:
481 if event in ev:
482 return ev
483 now = os.times()[4]
484 remaining = start + timeout - now
485 if remaining <= 0:
486 break
487 if not self.mon.pending(timeout=remaining):
488 break
489 return None
490
491 def wait_global_event(self, events, timeout):
492 if self.global_iface is None:
493 self.wait_event(events, timeout)
494 else:
495 start = os.times()[4]
496 while True:
497 while self.global_mon.pending():
498 ev = self.global_mon.recv()
499 logger.debug(self.ifname + "(global): " + ev)
500 for event in events:
501 if event in ev:
502 return ev
503 now = os.times()[4]
504 remaining = start + timeout - now
505 if remaining <= 0:
506 break
507 if not self.global_mon.pending(timeout=remaining):
508 break
509 return None
510
511 def wait_go_ending_session(self):
512 ev = self.wait_event(["P2P-GROUP-REMOVED"], timeout=3)
513 if ev is None:
514 raise Exception("Group removal event timed out")
515 if "reason=GO_ENDING_SESSION" not in ev:
516 raise Exception("Unexpected group removal reason")
517
518 def dump_monitor(self):
519 while self.mon.pending():
520 ev = self.mon.recv()
521 logger.debug(self.ifname + ": " + ev)
522 while self.global_mon.pending():
523 ev = self.global_mon.recv()
524 logger.debug(self.ifname + "(global): " + ev)
525
526 def remove_group(self, ifname=None):
527 if ifname is None:
528 ifname = self.group_ifname if self.group_ifname else self.ifname
529 if "OK" not in self.global_request("P2P_GROUP_REMOVE " + ifname):
530 raise Exception("Group could not be removed")
531 self.group_ifname = None
532
533 def p2p_start_go(self, persistent=None, freq=None):
534 self.dump_monitor()
535 cmd = "P2P_GROUP_ADD"
536 if persistent is None:
537 pass
538 elif persistent is True:
539 cmd = cmd + " persistent"
540 else:
541 cmd = cmd + " persistent=" + str(persistent)
542 if freq:
543 cmd = cmd + " freq=" + str(freq)
544 if "OK" in self.global_request(cmd):
545 ev = self.wait_global_event(["P2P-GROUP-STARTED"], timeout=5)
546 if ev is None:
547 raise Exception("GO start up timed out")
548 self.dump_monitor()
549 return self.group_form_result(ev)
550 raise Exception("P2P_GROUP_ADD failed")
551
552 def p2p_go_authorize_client(self, pin):
553 cmd = "WPS_PIN any " + pin
554 if "FAIL" in self.group_request(cmd):
555 raise Exception("Failed to authorize client connection on GO")
556 return None
557
558 def p2p_go_authorize_client_pbc(self):
559 cmd = "WPS_PBC"
560 if "FAIL" in self.group_request(cmd):
561 raise Exception("Failed to authorize client connection on GO")
562 return None
563
564 def p2p_connect_group(self, go_addr, pin, timeout=0, social=False):
565 self.dump_monitor()
566 if not self.discover_peer(go_addr, social=social):
567 raise Exception("GO " + go_addr + " not found")
568 self.dump_monitor()
569 cmd = "P2P_CONNECT " + go_addr + " " + pin + " join"
570 if "OK" in self.global_request(cmd):
571 if timeout == 0:
572 self.dump_monitor()
573 return None
574 ev = self.wait_global_event(["P2P-GROUP-STARTED"], timeout)
575 if ev is None:
576 raise Exception("Joining the group timed out")
577 self.dump_monitor()
578 return self.group_form_result(ev)
579 raise Exception("P2P_CONNECT(join) failed")
580
581 def tdls_setup(self, peer):
582 cmd = "TDLS_SETUP " + peer
583 if "FAIL" in self.group_request(cmd):
584 raise Exception("Failed to request TDLS setup")
585 return None
586
587 def tdls_teardown(self, peer):
588 cmd = "TDLS_TEARDOWN " + peer
589 if "FAIL" in self.group_request(cmd):
590 raise Exception("Failed to request TDLS teardown")
591 return None
592
593 def connect(self, ssid=None, ssid2=None, **kwargs):
594 logger.info("Connect STA " + self.ifname + " to AP")
595 id = self.add_network()
596 if ssid:
597 self.set_network_quoted(id, "ssid", ssid)
598 elif ssid2:
599 self.set_network(id, "ssid", ssid2)
600
601 quoted = [ "psk", "identity", "anonymous_identity", "password",
602 "ca_cert", "client_cert", "private_key",
603 "private_key_passwd", "ca_cert2", "client_cert2",
604 "private_key2", "phase1", "phase2", "domain_suffix_match",
605 "altsubject_match", "subject_match", "pac_file", "dh_file" ]
606 for field in quoted:
607 if field in kwargs and kwargs[field]:
608 self.set_network_quoted(id, field, kwargs[field])
609
610 not_quoted = [ "proto", "key_mgmt", "ieee80211w", "pairwise",
611 "group", "wep_key0", "scan_freq", "eap",
612 "eapol_flags", "fragment_size", "scan_ssid", "auth_alg" ]
613 for field in not_quoted:
614 if field in kwargs and kwargs[field]:
615 self.set_network(id, field, kwargs[field])
616
617 if "raw_psk" in kwargs and kwargs['raw_psk']:
618 self.set_network(id, "psk", kwargs['raw_psk'])
619 if "password_hex" in kwargs and kwargs['password_hex']:
620 self.set_network(id, "password", kwargs['password_hex'])
621 if "peerkey" in kwargs and kwargs['peerkey']:
622 self.set_network(id, "peerkey", "1")
623 if "okc" in kwargs and kwargs['okc']:
624 self.set_network(id, "proactive_key_caching", "1")
625 if "ocsp" in kwargs and kwargs['ocsp']:
626 self.set_network(id, "ocsp", str(kwargs['ocsp']))
627 if "only_add_network" in kwargs and kwargs['only_add_network']:
628 return id
629 if "wait_connect" not in kwargs or kwargs['wait_connect']:
630 if "eap" in kwargs:
631 self.connect_network(id, timeout=20)
632 else:
633 self.connect_network(id)
634 else:
635 self.dump_monitor()
636 self.select_network(id)
637 return id
638
639 def scan(self, type=None, freq=None, no_wait=False):
640 if type:
641 cmd = "SCAN TYPE=" + type
642 else:
643 cmd = "SCAN"
644 if freq:
645 cmd = cmd + " freq=" + freq
646 if not no_wait:
647 self.dump_monitor()
648 if not "OK" in self.request(cmd):
649 raise Exception("Failed to trigger scan")
650 if no_wait:
651 return
652 ev = self.wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
653 if ev is None:
654 raise Exception("Scan timed out")
655
656 def roam(self, bssid):
657 self.dump_monitor()
658 self.request("ROAM " + bssid)
659 ev = self.wait_event(["CTRL-EVENT-CONNECTED"], timeout=10)
660 if ev is None:
661 raise Exception("Roaming with the AP timed out")
662 self.dump_monitor()
663
664 def roam_over_ds(self, bssid):
665 self.dump_monitor()
666 self.request("FT_DS " + bssid)
667 ev = self.wait_event(["CTRL-EVENT-CONNECTED"], timeout=10)
668 if ev is None:
669 raise Exception("Roaming with the AP timed out")
670 self.dump_monitor()
671
672 def wps_reg(self, bssid, pin, new_ssid=None, key_mgmt=None, cipher=None,
673 new_passphrase=None, no_wait=False):
674 self.dump_monitor()
675 if new_ssid:
676 self.request("WPS_REG " + bssid + " " + pin + " " +
677 new_ssid.encode("hex") + " " + key_mgmt + " " +
678 cipher + " " + new_passphrase.encode("hex"))
679 if no_wait:
680 return
681 ev = self.wait_event(["WPS-SUCCESS"], timeout=15)
682 else:
683 self.request("WPS_REG " + bssid + " " + pin)
684 if no_wait:
685 return
686 ev = self.wait_event(["WPS-CRED-RECEIVED"], timeout=15)
687 if ev is None:
688 raise Exception("WPS cred timed out")
689 ev = self.wait_event(["WPS-FAIL"], timeout=15)
690 if ev is None:
691 raise Exception("WPS timed out")
692 ev = self.wait_event(["CTRL-EVENT-CONNECTED"], timeout=15)
693 if ev is None:
694 raise Exception("Association with the AP timed out")
695
696 def relog(self):
697 self.request("RELOG")
698
699 def wait_completed(self, timeout=10):
700 for i in range(0, timeout * 2):
701 if self.get_status_field("wpa_state") == "COMPLETED":
702 return
703 time.sleep(0.5)
704 raise Exception("Timeout while waiting for COMPLETED state")
705
706 def get_capability(self, field):
707 res = self.request("GET_CAPABILITY " + field)
708 if "FAIL" in res:
709 return None
710 return res.split(' ')
711
712 def get_bss(self, bssid):
713 res = self.request("BSS " + bssid)
714 lines = res.splitlines()
715 vals = dict()
716 for l in lines:
717 [name,value] = l.split('=', 1)
718 vals[name] = value
719 return vals
720
721 def get_pmksa(self, bssid):
722 res = self.request("PMKSA")
723 lines = res.splitlines()
724 for l in lines:
725 if bssid not in l:
726 continue
727 vals = dict()
728 [index,aa,pmkid,expiration,opportunistic] = l.split(' ')
729 vals['index'] = index
730 vals['pmkid'] = pmkid
731 vals['expiration'] = expiration
732 vals['opportunistic'] = opportunistic
733 return vals
734 return None
735
736 def get_sta(self, addr, info=None, next=False):
737 cmd = "STA-NEXT " if next else "STA "
738 if addr is None:
739 res = self.request("STA-FIRST")
740 elif info:
741 res = self.request(cmd + addr + " " + info)
742 else:
743 res = self.request(cmd + addr)
744 lines = res.splitlines()
745 vals = dict()
746 first = True
747 for l in lines:
748 if first:
749 vals['addr'] = l
750 first = False
751 else:
752 [name,value] = l.split('=', 1)
753 vals[name] = value
754 return vals