]> git.ipfire.org Git - thirdparty/hostap.git/blob - tests/hwsim/wpasupplicant.py
tests: Verify STATUS-WPS passphrase on GO
[thirdparty/hostap.git] / tests / hwsim / wpasupplicant.py
1 # Python class for controlling wpa_supplicant
2 # Copyright (c) 2013-2014, Jouni Malinen <j@w1.fi>
3 #
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
6
7 import os
8 import time
9 import logging
10 import binascii
11 import re
12 import struct
13 import subprocess
14 import wpaspy
15
16 logger = logging.getLogger()
17 wpas_ctrl = '/var/run/wpa_supplicant'
18
19 class WpaSupplicant:
20 def __init__(self, ifname=None, global_iface=None):
21 self.group_ifname = None
22 if ifname:
23 self.set_ifname(ifname)
24 else:
25 self.ifname = None
26
27 self.global_iface = global_iface
28 if global_iface:
29 self.global_ctrl = wpaspy.Ctrl(global_iface)
30 self.global_mon = wpaspy.Ctrl(global_iface)
31 self.global_mon.attach()
32
33 def set_ifname(self, ifname):
34 self.ifname = ifname
35 self.ctrl = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname))
36 self.mon = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname))
37 self.mon.attach()
38
39 def remove_ifname(self):
40 if self.ifname:
41 self.mon.detach()
42 self.mon = None
43 self.ctrl = None
44 self.ifname = None
45
46 def interface_add(self, ifname, driver="nl80211", drv_params=None):
47 try:
48 groups = subprocess.check_output(["id"])
49 group = "admin" if "(admin)" in groups else "adm"
50 except Exception, e:
51 group = "admin"
52 cmd = "INTERFACE_ADD " + ifname + "\t\t" + driver + "\tDIR=/var/run/wpa_supplicant GROUP=" + group
53 if drv_params:
54 cmd = cmd + '\t' + drv_params
55 if "FAIL" in self.global_request(cmd):
56 raise Exception("Failed to add a dynamic wpa_supplicant interface")
57 self.set_ifname(ifname)
58
59 def interface_remove(self, ifname):
60 self.remove_ifname()
61 self.global_request("INTERFACE_REMOVE " + ifname)
62
63 def request(self, cmd):
64 logger.debug(self.ifname + ": CTRL: " + cmd)
65 return self.ctrl.request(cmd)
66
67 def global_request(self, cmd):
68 if self.global_iface is None:
69 self.request(cmd)
70 else:
71 ifname = self.ifname or self.global_iface
72 logger.debug(ifname + ": CTRL: " + cmd)
73 return self.global_ctrl.request(cmd)
74
75 def group_request(self, cmd):
76 if self.group_ifname and self.group_ifname != self.ifname:
77 logger.debug(self.group_ifname + ": CTRL: " + cmd)
78 gctrl = wpaspy.Ctrl(os.path.join(wpas_ctrl, self.group_ifname))
79 return gctrl.request(cmd)
80 return self.request(cmd)
81
82 def ping(self):
83 return "PONG" in self.request("PING")
84
85 def reset(self):
86 res = self.request("FLUSH")
87 if not "OK" in res:
88 logger.info("FLUSH to " + self.ifname + " failed: " + res)
89 self.request("WPS_ER_STOP")
90 self.request("SET pmf 0")
91 self.request("SET external_sim 0")
92 self.request("SET hessid 00:00:00:00:00:00")
93 self.request("SET access_network_type 15")
94 self.request("SET p2p_add_cli_chan 0")
95 self.request("SET p2p_no_go_freq ")
96 self.request("SET p2p_pref_chan ")
97 self.request("SET p2p_no_group_iface 1")
98 self.request("SET p2p_go_intent 7")
99 self.group_ifname = None
100 self.dump_monitor()
101
102 iter = 0
103 while iter < 60:
104 state = self.get_driver_status_field("scan_state")
105 if "SCAN_STARTED" in state or "SCAN_REQUESTED" in state:
106 logger.info(self.ifname + ": Waiting for scan operation to complete before continuing")
107 time.sleep(1)
108 else:
109 break
110 iter = iter + 1
111 if iter == 60:
112 logger.error(self.ifname + ": Driver scan state did not clear")
113 print "Trying to clear cfg80211/mac80211 scan state"
114 try:
115 cmd = ["sudo", "ifconfig", self.ifname, "down"]
116 subprocess.call(cmd)
117 except subprocess.CalledProcessError, e:
118 logger.info("ifconfig failed: " + str(e.returncode))
119 logger.info(e.output)
120 try:
121 cmd = ["sudo", "ifconfig", self.ifname, "up"]
122 subprocess.call(cmd)
123 except subprocess.CalledProcessError, e:
124 logger.info("ifconfig failed: " + str(e.returncode))
125 logger.info(e.output)
126 if iter > 0:
127 # The ongoing scan could have discovered BSSes or P2P peers
128 logger.info("Run FLUSH again since scan was in progress")
129 self.request("FLUSH")
130 self.dump_monitor()
131
132 if not self.ping():
133 logger.info("No PING response from " + self.ifname + " after reset")
134
135 def add_network(self):
136 id = self.request("ADD_NETWORK")
137 if "FAIL" in id:
138 raise Exception("ADD_NETWORK failed")
139 return int(id)
140
141 def remove_network(self, id):
142 id = self.request("REMOVE_NETWORK " + str(id))
143 if "FAIL" in id:
144 raise Exception("REMOVE_NETWORK failed")
145 return None
146
147 def get_network(self, id, field):
148 res = self.request("GET_NETWORK " + str(id) + " " + field)
149 if res == "FAIL\n":
150 return None
151 return res
152
153 def set_network(self, id, field, value):
154 res = self.request("SET_NETWORK " + str(id) + " " + field + " " + value)
155 if "FAIL" in res:
156 raise Exception("SET_NETWORK failed")
157 return None
158
159 def set_network_quoted(self, id, field, value):
160 res = self.request("SET_NETWORK " + str(id) + " " + field + ' "' + value + '"')
161 if "FAIL" in res:
162 raise Exception("SET_NETWORK failed")
163 return None
164
165 def list_networks(self):
166 res = self.request("LIST_NETWORKS")
167 lines = res.splitlines()
168 networks = []
169 for l in lines:
170 if "network id" in l:
171 continue
172 [id,ssid,bssid,flags] = l.split('\t')
173 network = {}
174 network['id'] = id
175 network['ssid'] = ssid
176 network['bssid'] = bssid
177 network['flags'] = flags
178 networks.append(network)
179 return networks
180
181 def hs20_enable(self):
182 self.request("SET interworking 1")
183 self.request("SET hs20 1")
184
185 def add_cred(self):
186 id = self.request("ADD_CRED")
187 if "FAIL" in id:
188 raise Exception("ADD_CRED failed")
189 return int(id)
190
191 def remove_cred(self, id):
192 id = self.request("REMOVE_CRED " + str(id))
193 if "FAIL" in id:
194 raise Exception("REMOVE_CRED failed")
195 return None
196
197 def set_cred(self, id, field, value):
198 res = self.request("SET_CRED " + str(id) + " " + field + " " + value)
199 if "FAIL" in res:
200 raise Exception("SET_CRED failed")
201 return None
202
203 def set_cred_quoted(self, id, field, value):
204 res = self.request("SET_CRED " + str(id) + " " + field + ' "' + value + '"')
205 if "FAIL" in res:
206 raise Exception("SET_CRED failed")
207 return None
208
209 def get_cred(self, id, field):
210 return self.request("GET_CRED " + str(id) + " " + field)
211
212 def add_cred_values(self, params):
213 id = self.add_cred()
214
215 quoted = [ "realm", "username", "password", "domain", "imsi",
216 "excluded_ssid", "milenage", "ca_cert", "client_cert",
217 "private_key", "domain_suffix_match", "provisioning_sp",
218 "roaming_partner", "phase1", "phase2" ]
219 for field in quoted:
220 if field in params:
221 self.set_cred_quoted(id, field, params[field])
222
223 not_quoted = [ "eap", "roaming_consortium", "priority",
224 "required_roaming_consortium", "sp_priority",
225 "max_bss_load", "update_identifier", "req_conn_capab",
226 "min_dl_bandwidth_home", "min_ul_bandwidth_home",
227 "min_dl_bandwidth_roaming", "min_ul_bandwidth_roaming" ]
228 for field in not_quoted:
229 if field in params:
230 self.set_cred(id, field, params[field])
231
232 return id;
233
234 def select_network(self, id, freq=None):
235 if freq:
236 extra = " freq=" + freq
237 else:
238 extra = ""
239 id = self.request("SELECT_NETWORK " + str(id) + extra)
240 if "FAIL" in id:
241 raise Exception("SELECT_NETWORK failed")
242 return None
243
244 def connect_network(self, id, timeout=10):
245 self.dump_monitor()
246 self.select_network(id)
247 ev = self.wait_event(["CTRL-EVENT-CONNECTED"], timeout=timeout)
248 if ev is None:
249 raise Exception("Association with the AP timed out")
250 self.dump_monitor()
251
252 def get_status(self, extra=None):
253 if extra:
254 extra = "-" + extra
255 else:
256 extra = ""
257 res = self.request("STATUS" + extra)
258 lines = res.splitlines()
259 vals = dict()
260 for l in lines:
261 try:
262 [name,value] = l.split('=', 1)
263 vals[name] = value
264 except ValueError, e:
265 logger.info(self.ifname + ": Ignore unexpected STATUS line: " + l)
266 return vals
267
268 def get_status_field(self, field, extra=None):
269 vals = self.get_status(extra)
270 if field in vals:
271 return vals[field]
272 return None
273
274 def get_group_status(self, extra=None):
275 if extra:
276 extra = "-" + extra
277 else:
278 extra = ""
279 res = self.group_request("STATUS" + extra)
280 lines = res.splitlines()
281 vals = dict()
282 for l in lines:
283 [name,value] = l.split('=', 1)
284 vals[name] = value
285 return vals
286
287 def get_group_status_field(self, field, extra=None):
288 vals = self.get_group_status(extra)
289 if field in vals:
290 return vals[field]
291 return None
292
293 def get_driver_status(self):
294 res = self.request("STATUS-DRIVER")
295 lines = res.splitlines()
296 vals = dict()
297 for l in lines:
298 [name,value] = l.split('=', 1)
299 vals[name] = value
300 return vals
301
302 def get_driver_status_field(self, field):
303 vals = self.get_driver_status()
304 if field in vals:
305 return vals[field]
306 return None
307
308 def get_mib(self):
309 res = self.request("MIB")
310 lines = res.splitlines()
311 vals = dict()
312 for l in lines:
313 try:
314 [name,value] = l.split('=', 1)
315 vals[name] = value
316 except ValueError, e:
317 logger.info(self.ifname + ": Ignore unexpected MIB line: " + l)
318 return vals
319
320 def p2p_dev_addr(self):
321 return self.get_status_field("p2p_device_address")
322
323 def p2p_interface_addr(self):
324 return self.get_group_status_field("address")
325
326 def p2p_listen(self):
327 return self.global_request("P2P_LISTEN")
328
329 def p2p_find(self, social=False, dev_id=None, dev_type=None):
330 cmd = "P2P_FIND"
331 if social:
332 cmd = cmd + " type=social"
333 if dev_id:
334 cmd = cmd + " dev_id=" + dev_id
335 if dev_type:
336 cmd = cmd + " dev_type=" + dev_type
337 return self.global_request(cmd)
338
339 def p2p_stop_find(self):
340 return self.global_request("P2P_STOP_FIND")
341
342 def wps_read_pin(self):
343 self.pin = self.request("WPS_PIN get").rstrip("\n")
344 if "FAIL" in self.pin:
345 raise Exception("Could not generate PIN")
346 return self.pin
347
348 def peer_known(self, peer, full=True):
349 res = self.global_request("P2P_PEER " + peer)
350 if peer.lower() not in res.lower():
351 return False
352 if not full:
353 return True
354 return "[PROBE_REQ_ONLY]" not in res
355
356 def discover_peer(self, peer, full=True, timeout=15, social=True, force_find=False):
357 logger.info(self.ifname + ": Trying to discover peer " + peer)
358 if not force_find and self.peer_known(peer, full):
359 return True
360 self.p2p_find(social)
361 count = 0
362 while count < timeout:
363 time.sleep(1)
364 count = count + 1
365 if self.peer_known(peer, full):
366 return True
367 return False
368
369 def get_peer(self, peer):
370 res = self.global_request("P2P_PEER " + peer)
371 if peer.lower() not in res.lower():
372 raise Exception("Peer information not available")
373 lines = res.splitlines()
374 vals = dict()
375 for l in lines:
376 if '=' in l:
377 [name,value] = l.split('=', 1)
378 vals[name] = value
379 return vals
380
381 def group_form_result(self, ev, expect_failure=False, go_neg_res=None):
382 if expect_failure:
383 if "P2P-GROUP-STARTED" in ev:
384 raise Exception("Group formation succeeded when expecting failure")
385 exp = r'<.>(P2P-GO-NEG-FAILURE) status=([0-9]*)'
386 s = re.split(exp, ev)
387 if len(s) < 3:
388 return None
389 res = {}
390 res['result'] = 'go-neg-failed'
391 res['status'] = int(s[2])
392 return res
393
394 if "P2P-GROUP-STARTED" not in ev:
395 raise Exception("No P2P-GROUP-STARTED event seen")
396
397 exp = r'<.>(P2P-GROUP-STARTED) ([^ ]*) ([^ ]*) ssid="(.*)" freq=([0-9]*) ((?:psk=.*)|(?:passphrase=".*")) go_dev_addr=([0-9a-f:]*) ip_addr=([0-9.]*) ip_mask=([0-9.]*) go_ip_addr=([0-9.]*)'
398 s = re.split(exp, ev)
399 if len(s) < 11:
400 exp = r'<.>(P2P-GROUP-STARTED) ([^ ]*) ([^ ]*) ssid="(.*)" freq=([0-9]*) ((?:psk=.*)|(?:passphrase=".*")) go_dev_addr=([0-9a-f:]*)'
401 s = re.split(exp, ev)
402 if len(s) < 8:
403 raise Exception("Could not parse P2P-GROUP-STARTED")
404 res = {}
405 res['result'] = 'success'
406 res['ifname'] = s[2]
407 self.group_ifname = s[2]
408 res['role'] = s[3]
409 res['ssid'] = s[4]
410 res['freq'] = s[5]
411 if "[PERSISTENT]" in ev:
412 res['persistent'] = True
413 else:
414 res['persistent'] = False
415 p = re.match(r'psk=([0-9a-f]*)', s[6])
416 if p:
417 res['psk'] = p.group(1)
418 p = re.match(r'passphrase="(.*)"', s[6])
419 if p:
420 res['passphrase'] = p.group(1)
421 res['go_dev_addr'] = s[7]
422
423 if len(s) > 8 and len(s[8]) > 0:
424 res['ip_addr'] = s[8]
425 if len(s) > 9:
426 res['ip_mask'] = s[9]
427 if len(s) > 10:
428 res['go_ip_addr'] = s[10]
429
430 if go_neg_res:
431 exp = r'<.>(P2P-GO-NEG-SUCCESS) role=(GO|client) freq=([0-9]*)'
432 s = re.split(exp, go_neg_res)
433 if len(s) < 4:
434 raise Exception("Could not parse P2P-GO-NEG-SUCCESS")
435 res['go_neg_role'] = s[2]
436 res['go_neg_freq'] = s[3]
437
438 return res
439
440 def p2p_go_neg_auth(self, peer, pin, method, go_intent=None, persistent=False, freq=None):
441 if not self.discover_peer(peer):
442 raise Exception("Peer " + peer + " not found")
443 self.dump_monitor()
444 cmd = "P2P_CONNECT " + peer + " " + pin + " " + method + " auth"
445 if go_intent:
446 cmd = cmd + ' go_intent=' + str(go_intent)
447 if freq:
448 cmd = cmd + ' freq=' + str(freq)
449 if persistent:
450 cmd = cmd + " persistent"
451 if "OK" in self.global_request(cmd):
452 return None
453 raise Exception("P2P_CONNECT (auth) failed")
454
455 def p2p_go_neg_auth_result(self, timeout=1, expect_failure=False):
456 go_neg_res = None
457 ev = self.wait_global_event(["P2P-GO-NEG-SUCCESS",
458 "P2P-GO-NEG-FAILURE"], timeout);
459 if ev is None:
460 if expect_failure:
461 return None
462 raise Exception("Group formation timed out")
463 if "P2P-GO-NEG-SUCCESS" in ev:
464 go_neg_res = ev
465 ev = self.wait_global_event(["P2P-GROUP-STARTED"], timeout);
466 if ev is None:
467 if expect_failure:
468 return None
469 raise Exception("Group formation timed out")
470 self.dump_monitor()
471 return self.group_form_result(ev, expect_failure, go_neg_res)
472
473 def p2p_go_neg_init(self, peer, pin, method, timeout=0, go_intent=None, expect_failure=False, persistent=False, freq=None, provdisc=False):
474 if not self.discover_peer(peer):
475 raise Exception("Peer " + peer + " not found")
476 self.dump_monitor()
477 if pin:
478 cmd = "P2P_CONNECT " + peer + " " + pin + " " + method
479 else:
480 cmd = "P2P_CONNECT " + peer + " " + method
481 if go_intent:
482 cmd = cmd + ' go_intent=' + str(go_intent)
483 if freq:
484 cmd = cmd + ' freq=' + str(freq)
485 if persistent:
486 cmd = cmd + " persistent"
487 if provdisc:
488 cmd = cmd + " provdisc"
489 if "OK" in self.global_request(cmd):
490 if timeout == 0:
491 self.dump_monitor()
492 return None
493 go_neg_res = None
494 ev = self.wait_global_event(["P2P-GO-NEG-SUCCESS",
495 "P2P-GO-NEG-FAILURE"], timeout)
496 if ev is None:
497 if expect_failure:
498 return None
499 raise Exception("Group formation timed out")
500 if "P2P-GO-NEG-SUCCESS" in ev:
501 go_neg_res = ev
502 ev = self.wait_global_event(["P2P-GROUP-STARTED"], timeout)
503 if ev is None:
504 if expect_failure:
505 return None
506 raise Exception("Group formation timed out")
507 self.dump_monitor()
508 return self.group_form_result(ev, expect_failure, go_neg_res)
509 raise Exception("P2P_CONNECT failed")
510
511 def wait_event(self, events, timeout=10):
512 start = os.times()[4]
513 while True:
514 while self.mon.pending():
515 ev = self.mon.recv()
516 logger.debug(self.ifname + ": " + ev)
517 for event in events:
518 if event in ev:
519 return ev
520 now = os.times()[4]
521 remaining = start + timeout - now
522 if remaining <= 0:
523 break
524 if not self.mon.pending(timeout=remaining):
525 break
526 return None
527
528 def wait_global_event(self, events, timeout):
529 if self.global_iface is None:
530 self.wait_event(events, timeout)
531 else:
532 start = os.times()[4]
533 while True:
534 while self.global_mon.pending():
535 ev = self.global_mon.recv()
536 logger.debug(self.ifname + "(global): " + ev)
537 for event in events:
538 if event in ev:
539 return ev
540 now = os.times()[4]
541 remaining = start + timeout - now
542 if remaining <= 0:
543 break
544 if not self.global_mon.pending(timeout=remaining):
545 break
546 return None
547
548 def wait_go_ending_session(self):
549 ev = self.wait_event(["P2P-GROUP-REMOVED"], timeout=3)
550 if ev is None:
551 raise Exception("Group removal event timed out")
552 if "reason=GO_ENDING_SESSION" not in ev:
553 raise Exception("Unexpected group removal reason")
554
555 def dump_monitor(self):
556 while self.mon.pending():
557 ev = self.mon.recv()
558 logger.debug(self.ifname + ": " + ev)
559 while self.global_mon.pending():
560 ev = self.global_mon.recv()
561 logger.debug(self.ifname + "(global): " + ev)
562
563 def remove_group(self, ifname=None):
564 if ifname is None:
565 ifname = self.group_ifname if self.group_ifname else self.ifname
566 if "OK" not in self.global_request("P2P_GROUP_REMOVE " + ifname):
567 raise Exception("Group could not be removed")
568 self.group_ifname = None
569
570 def p2p_start_go(self, persistent=None, freq=None):
571 self.dump_monitor()
572 cmd = "P2P_GROUP_ADD"
573 if persistent is None:
574 pass
575 elif persistent is True:
576 cmd = cmd + " persistent"
577 else:
578 cmd = cmd + " persistent=" + str(persistent)
579 if freq:
580 cmd = cmd + " freq=" + str(freq)
581 if "OK" in self.global_request(cmd):
582 ev = self.wait_global_event(["P2P-GROUP-STARTED"], timeout=5)
583 if ev is None:
584 raise Exception("GO start up timed out")
585 self.dump_monitor()
586 return self.group_form_result(ev)
587 raise Exception("P2P_GROUP_ADD failed")
588
589 def p2p_go_authorize_client(self, pin):
590 cmd = "WPS_PIN any " + pin
591 if "FAIL" in self.group_request(cmd):
592 raise Exception("Failed to authorize client connection on GO")
593 return None
594
595 def p2p_go_authorize_client_pbc(self):
596 cmd = "WPS_PBC"
597 if "FAIL" in self.group_request(cmd):
598 raise Exception("Failed to authorize client connection on GO")
599 return None
600
601 def p2p_connect_group(self, go_addr, pin, timeout=0, social=False):
602 self.dump_monitor()
603 if not self.discover_peer(go_addr, social=social):
604 raise Exception("GO " + go_addr + " not found")
605 self.dump_monitor()
606 cmd = "P2P_CONNECT " + go_addr + " " + pin + " join"
607 if "OK" in self.global_request(cmd):
608 if timeout == 0:
609 self.dump_monitor()
610 return None
611 ev = self.wait_global_event(["P2P-GROUP-STARTED"], timeout)
612 if ev is None:
613 raise Exception("Joining the group timed out")
614 self.dump_monitor()
615 return self.group_form_result(ev)
616 raise Exception("P2P_CONNECT(join) failed")
617
618 def tdls_setup(self, peer):
619 cmd = "TDLS_SETUP " + peer
620 if "FAIL" in self.group_request(cmd):
621 raise Exception("Failed to request TDLS setup")
622 return None
623
624 def tdls_teardown(self, peer):
625 cmd = "TDLS_TEARDOWN " + peer
626 if "FAIL" in self.group_request(cmd):
627 raise Exception("Failed to request TDLS teardown")
628 return None
629
630 def connect(self, ssid=None, ssid2=None, **kwargs):
631 logger.info("Connect STA " + self.ifname + " to AP")
632 id = self.add_network()
633 if ssid:
634 self.set_network_quoted(id, "ssid", ssid)
635 elif ssid2:
636 self.set_network(id, "ssid", ssid2)
637
638 quoted = [ "psk", "identity", "anonymous_identity", "password",
639 "ca_cert", "client_cert", "private_key",
640 "private_key_passwd", "ca_cert2", "client_cert2",
641 "private_key2", "phase1", "phase2", "domain_suffix_match",
642 "altsubject_match", "subject_match", "pac_file", "dh_file" ]
643 for field in quoted:
644 if field in kwargs and kwargs[field]:
645 self.set_network_quoted(id, field, kwargs[field])
646
647 not_quoted = [ "proto", "key_mgmt", "ieee80211w", "pairwise",
648 "group", "wep_key0", "scan_freq", "eap",
649 "eapol_flags", "fragment_size", "scan_ssid", "auth_alg",
650 "wpa_ptk_rekey", "disable_ht", "disable_vht", "bssid" ]
651 for field in not_quoted:
652 if field in kwargs and kwargs[field]:
653 self.set_network(id, field, kwargs[field])
654
655 if "raw_psk" in kwargs and kwargs['raw_psk']:
656 self.set_network(id, "psk", kwargs['raw_psk'])
657 if "password_hex" in kwargs and kwargs['password_hex']:
658 self.set_network(id, "password", kwargs['password_hex'])
659 if "peerkey" in kwargs and kwargs['peerkey']:
660 self.set_network(id, "peerkey", "1")
661 if "okc" in kwargs and kwargs['okc']:
662 self.set_network(id, "proactive_key_caching", "1")
663 if "ocsp" in kwargs and kwargs['ocsp']:
664 self.set_network(id, "ocsp", str(kwargs['ocsp']))
665 if "only_add_network" in kwargs and kwargs['only_add_network']:
666 return id
667 if "wait_connect" not in kwargs or kwargs['wait_connect']:
668 if "eap" in kwargs:
669 self.connect_network(id, timeout=20)
670 else:
671 self.connect_network(id)
672 else:
673 self.dump_monitor()
674 self.select_network(id)
675 return id
676
677 def scan(self, type=None, freq=None, no_wait=False):
678 if type:
679 cmd = "SCAN TYPE=" + type
680 else:
681 cmd = "SCAN"
682 if freq:
683 cmd = cmd + " freq=" + freq
684 if not no_wait:
685 self.dump_monitor()
686 if not "OK" in self.request(cmd):
687 raise Exception("Failed to trigger scan")
688 if no_wait:
689 return
690 ev = self.wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
691 if ev is None:
692 raise Exception("Scan timed out")
693
694 def roam(self, bssid, fail_test=False):
695 self.dump_monitor()
696 self.request("ROAM " + bssid)
697 if fail_test:
698 ev = self.wait_event(["CTRL-EVENT-CONNECTED"], timeout=1)
699 if ev is not None:
700 raise Exception("Unexpected connection")
701 self.dump_monitor()
702 return
703 ev = self.wait_event(["CTRL-EVENT-CONNECTED"], timeout=10)
704 if ev is None:
705 raise Exception("Roaming with the AP timed out")
706 self.dump_monitor()
707
708 def roam_over_ds(self, bssid, fail_test=False):
709 self.dump_monitor()
710 self.request("FT_DS " + bssid)
711 if fail_test:
712 ev = self.wait_event(["CTRL-EVENT-CONNECTED"], timeout=1)
713 if ev is not None:
714 raise Exception("Unexpected connection")
715 self.dump_monitor()
716 return
717 ev = self.wait_event(["CTRL-EVENT-CONNECTED"], timeout=10)
718 if ev is None:
719 raise Exception("Roaming with the AP timed out")
720 self.dump_monitor()
721
722 def wps_reg(self, bssid, pin, new_ssid=None, key_mgmt=None, cipher=None,
723 new_passphrase=None, no_wait=False):
724 self.dump_monitor()
725 if new_ssid:
726 self.request("WPS_REG " + bssid + " " + pin + " " +
727 new_ssid.encode("hex") + " " + key_mgmt + " " +
728 cipher + " " + new_passphrase.encode("hex"))
729 if no_wait:
730 return
731 ev = self.wait_event(["WPS-SUCCESS"], timeout=15)
732 else:
733 self.request("WPS_REG " + bssid + " " + pin)
734 if no_wait:
735 return
736 ev = self.wait_event(["WPS-CRED-RECEIVED"], timeout=15)
737 if ev is None:
738 raise Exception("WPS cred timed out")
739 ev = self.wait_event(["WPS-FAIL"], timeout=15)
740 if ev is None:
741 raise Exception("WPS timed out")
742 ev = self.wait_event(["CTRL-EVENT-CONNECTED"], timeout=15)
743 if ev is None:
744 raise Exception("Association with the AP timed out")
745
746 def relog(self):
747 self.request("RELOG")
748
749 def wait_completed(self, timeout=10):
750 for i in range(0, timeout * 2):
751 if self.get_status_field("wpa_state") == "COMPLETED":
752 return
753 time.sleep(0.5)
754 raise Exception("Timeout while waiting for COMPLETED state")
755
756 def get_capability(self, field):
757 res = self.request("GET_CAPABILITY " + field)
758 if "FAIL" in res:
759 return None
760 return res.split(' ')
761
762 def get_bss(self, bssid):
763 res = self.request("BSS " + bssid)
764 lines = res.splitlines()
765 vals = dict()
766 for l in lines:
767 [name,value] = l.split('=', 1)
768 vals[name] = value
769 return vals
770
771 def get_pmksa(self, bssid):
772 res = self.request("PMKSA")
773 lines = res.splitlines()
774 for l in lines:
775 if bssid not in l:
776 continue
777 vals = dict()
778 [index,aa,pmkid,expiration,opportunistic] = l.split(' ')
779 vals['index'] = index
780 vals['pmkid'] = pmkid
781 vals['expiration'] = expiration
782 vals['opportunistic'] = opportunistic
783 return vals
784 return None
785
786 def get_sta(self, addr, info=None, next=False):
787 cmd = "STA-NEXT " if next else "STA "
788 if addr is None:
789 res = self.request("STA-FIRST")
790 elif info:
791 res = self.request(cmd + addr + " " + info)
792 else:
793 res = self.request(cmd + addr)
794 lines = res.splitlines()
795 vals = dict()
796 first = True
797 for l in lines:
798 if first:
799 vals['addr'] = l
800 first = False
801 else:
802 [name,value] = l.split('=', 1)
803 vals[name] = value
804 return vals
805
806 def mgmt_rx(self, timeout=5):
807 ev = self.wait_event(["MGMT-RX"], timeout=timeout)
808 if ev is None:
809 return None
810 msg = {}
811 items = ev.split(' ')
812 field,val = items[1].split('=')
813 if field != "freq":
814 raise Exception("Unexpected MGMT-RX event format: " + ev)
815 msg['freq'] = val
816 frame = binascii.unhexlify(items[4])
817 msg['frame'] = frame
818
819 hdr = struct.unpack('<HH6B6B6BH', frame[0:24])
820 msg['fc'] = hdr[0]
821 msg['subtype'] = (hdr[0] >> 4) & 0xf
822 hdr = hdr[1:]
823 msg['duration'] = hdr[0]
824 hdr = hdr[1:]
825 msg['da'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
826 hdr = hdr[6:]
827 msg['sa'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
828 hdr = hdr[6:]
829 msg['bssid'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
830 hdr = hdr[6:]
831 msg['seq_ctrl'] = hdr[0]
832 msg['payload'] = frame[24:]
833
834 return msg