]> git.ipfire.org Git - thirdparty/hostap.git/blob - tests/hwsim/wpasupplicant.py
tests: Fix ap_ft_reassoc_replay for case where wlantest has the PSK
[thirdparty/hostap.git] / tests / hwsim / wpasupplicant.py
1 # Python class for controlling wpa_supplicant
2 # Copyright (c) 2013-2019, Jouni Malinen <j@w1.fi>
3 #
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
6
7 import os
8 import time
9 import logging
10 import binascii
11 import re
12 import struct
13 import wpaspy
14 import remotehost
15 import subprocess
16
17 logger = logging.getLogger()
18 wpas_ctrl = '/var/run/wpa_supplicant'
19
20 class WpaSupplicant:
21 def __init__(self, ifname=None, global_iface=None, hostname=None,
22 port=9877, global_port=9878, monitor=True):
23 self.monitor = monitor
24 self.hostname = hostname
25 self.group_ifname = None
26 self.global_mon = None
27 self.global_ctrl = None
28 self.gctrl_mon = None
29 self.ctrl = None
30 self.mon = None
31 self.ifname = None
32 self.host = remotehost.Host(hostname, ifname)
33 self._group_dbg = None
34 if ifname:
35 self.set_ifname(ifname, hostname, port)
36 res = self.get_driver_status()
37 if 'capa.flags' in res and int(res['capa.flags'], 0) & 0x20000000:
38 self.p2p_dev_ifname = 'p2p-dev-' + self.ifname
39 else:
40 self.p2p_dev_ifname = ifname
41
42 self.global_iface = global_iface
43 if global_iface:
44 if hostname != None:
45 self.global_ctrl = wpaspy.Ctrl(hostname, global_port)
46 if self.monitor:
47 self.global_mon = wpaspy.Ctrl(hostname, global_port)
48 self.global_dbg = hostname + "/" + str(global_port) + "/"
49 else:
50 self.global_ctrl = wpaspy.Ctrl(global_iface)
51 if self.monitor:
52 self.global_mon = wpaspy.Ctrl(global_iface)
53 self.global_dbg = ""
54 if self.monitor:
55 self.global_mon.attach()
56
57 def __del__(self):
58 self.close_monitor()
59 self.close_control()
60
61 def close_control_ctrl(self):
62 if self.ctrl:
63 del self.ctrl
64 self.ctrl = None
65
66 def close_control_global(self):
67 if self.global_ctrl:
68 del self.global_ctrl
69 self.global_ctrl = None
70
71 def close_control(self):
72 self.close_control_ctrl()
73 self.close_control_global()
74
75 def close_monitor_mon(self):
76 if not self.mon:
77 return
78 try:
79 while self.mon.pending():
80 ev = self.mon.recv()
81 logger.debug(self.dbg + ": " + ev)
82 except:
83 pass
84 try:
85 self.mon.detach()
86 except ConnectionRefusedError:
87 pass
88 except Exception as e:
89 if str(e) == "DETACH failed":
90 pass
91 else:
92 raise
93 del self.mon
94 self.mon = None
95
96 def close_monitor_global(self):
97 if not self.global_mon:
98 return
99 try:
100 while self.global_mon.pending():
101 ev = self.global_mon.recv()
102 logger.debug(self.global_dbg + ": " + ev)
103 except:
104 pass
105 try:
106 self.global_mon.detach()
107 except ConnectionRefusedError:
108 pass
109 except Exception as e:
110 if str(e) == "DETACH failed":
111 pass
112 else:
113 raise
114 del self.global_mon
115 self.global_mon = None
116
117 def close_monitor_group(self):
118 if not self.gctrl_mon:
119 return
120 try:
121 while self.gctrl_mon.pending():
122 ev = self.gctrl_mon.recv()
123 logger.debug(self.dbg + ": " + ev)
124 except:
125 pass
126 try:
127 self.gctrl_mon.detach()
128 except:
129 pass
130 del self.gctrl_mon
131 self.gctrl_mon = None
132
133 def close_monitor(self):
134 self.close_monitor_mon()
135 self.close_monitor_global()
136 self.close_monitor_group()
137
138 def cmd_execute(self, cmd_array, shell=False):
139 if self.hostname is None:
140 if shell:
141 cmd = ' '.join(cmd_array)
142 else:
143 cmd = cmd_array
144 proc = subprocess.Popen(cmd, stderr=subprocess.STDOUT,
145 stdout=subprocess.PIPE, shell=shell)
146 out = proc.communicate()[0]
147 ret = proc.returncode
148 return ret, out.decode()
149 else:
150 return self.host.execute(cmd_array)
151
152 def terminate(self):
153 if self.global_mon:
154 self.close_monitor_global()
155 self.global_ctrl.terminate()
156 self.global_ctrl = None
157
158 def close_ctrl(self):
159 self.close_monitor_global()
160 self.close_control_global()
161 self.remove_ifname()
162
163 def set_ifname(self, ifname, hostname=None, port=9877):
164 self.remove_ifname()
165 self.ifname = ifname
166 if hostname != None:
167 self.ctrl = wpaspy.Ctrl(hostname, port)
168 if self.monitor:
169 self.mon = wpaspy.Ctrl(hostname, port)
170 self.host = remotehost.Host(hostname, ifname)
171 self.dbg = hostname + "/" + ifname
172 else:
173 self.ctrl = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname))
174 if self.monitor:
175 self.mon = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname))
176 self.dbg = ifname
177 if self.monitor:
178 self.mon.attach()
179
180 def remove_ifname(self):
181 self.close_monitor_mon()
182 self.close_control_ctrl()
183 self.ifname = None
184
185 def get_ctrl_iface_port(self, ifname):
186 if self.hostname is None:
187 return None
188
189 res = self.global_request("INTERFACES ctrl")
190 lines = res.splitlines()
191 found = False
192 for line in lines:
193 words = line.split()
194 if words[0] == ifname:
195 found = True
196 break
197 if not found:
198 raise Exception("Could not find UDP port for " + ifname)
199 res = line.find("ctrl_iface=udp:")
200 if res == -1:
201 raise Exception("Wrong ctrl_interface format")
202 words = line.split(":")
203 return int(words[1])
204
205 def interface_add(self, ifname, config="", driver="nl80211",
206 drv_params=None, br_ifname=None, create=False,
207 set_ifname=True, all_params=False, if_type=None):
208 status, groups = self.host.execute(["id"])
209 if status != 0:
210 group = "admin"
211 group = "admin" if "(admin)" in groups else "adm"
212 cmd = "INTERFACE_ADD " + ifname + "\t" + config + "\t" + driver + "\tDIR=/var/run/wpa_supplicant GROUP=" + group
213 if drv_params:
214 cmd = cmd + '\t' + drv_params
215 if br_ifname:
216 if not drv_params:
217 cmd += '\t'
218 cmd += '\t' + br_ifname
219 if create:
220 if not br_ifname:
221 cmd += '\t'
222 if not drv_params:
223 cmd += '\t'
224 cmd += '\tcreate'
225 if if_type:
226 cmd += '\t' + if_type
227 if all_params and not create:
228 if not br_ifname:
229 cmd += '\t'
230 if not drv_params:
231 cmd += '\t'
232 cmd += '\t'
233 if "FAIL" in self.global_request(cmd):
234 raise Exception("Failed to add a dynamic wpa_supplicant interface")
235 if not create and set_ifname:
236 port = self.get_ctrl_iface_port(ifname)
237 self.set_ifname(ifname, self.hostname, port)
238 res = self.get_driver_status()
239 if 'capa.flags' in res and int(res['capa.flags'], 0) & 0x20000000:
240 self.p2p_dev_ifname = 'p2p-dev-' + self.ifname
241 else:
242 self.p2p_dev_ifname = ifname
243
244 def interface_remove(self, ifname):
245 self.remove_ifname()
246 self.global_request("INTERFACE_REMOVE " + ifname)
247
248 def request(self, cmd, timeout=10):
249 logger.debug(self.dbg + ": CTRL: " + cmd)
250 return self.ctrl.request(cmd, timeout=timeout)
251
252 def global_request(self, cmd):
253 if self.global_iface is None:
254 return self.request(cmd)
255 else:
256 ifname = self.ifname or self.global_iface
257 logger.debug(self.global_dbg + ifname + ": CTRL(global): " + cmd)
258 return self.global_ctrl.request(cmd)
259
260 @property
261 def group_dbg(self):
262 if self._group_dbg is not None:
263 return self._group_dbg
264 if self.group_ifname is None:
265 raise Exception("Cannot have group_dbg without group_ifname")
266 if self.hostname is None:
267 self._group_dbg = self.group_ifname
268 else:
269 self._group_dbg = self.hostname + "/" + self.group_ifname
270 return self._group_dbg
271
272 def group_request(self, cmd):
273 if self.group_ifname and self.group_ifname != self.ifname:
274 if self.hostname is None:
275 gctrl = wpaspy.Ctrl(os.path.join(wpas_ctrl, self.group_ifname))
276 else:
277 port = self.get_ctrl_iface_port(self.group_ifname)
278 gctrl = wpaspy.Ctrl(self.hostname, port)
279 logger.debug(self.group_dbg + ": CTRL(group): " + cmd)
280 return gctrl.request(cmd)
281 return self.request(cmd)
282
283 def ping(self):
284 return "PONG" in self.request("PING")
285
286 def global_ping(self):
287 return "PONG" in self.global_request("PING")
288
289 def reset(self):
290 self.dump_monitor()
291 res = self.request("FLUSH")
292 if "OK" not in res:
293 logger.info("FLUSH to " + self.ifname + " failed: " + res)
294 self.global_request("REMOVE_NETWORK all")
295 self.global_request("SET p2p_no_group_iface 1")
296 self.global_request("P2P_FLUSH")
297 self.close_monitor_group()
298 self.group_ifname = None
299 self.dump_monitor()
300
301 iter = 0
302 while iter < 60:
303 state1 = self.get_driver_status_field("scan_state")
304 p2pdev = "p2p-dev-" + self.ifname
305 state2 = self.get_driver_status_field("scan_state", ifname=p2pdev)
306 states = str(state1) + " " + str(state2)
307 if "SCAN_STARTED" in states or "SCAN_REQUESTED" in states:
308 logger.info(self.ifname + ": Waiting for scan operation to complete before continuing")
309 time.sleep(1)
310 else:
311 break
312 iter = iter + 1
313 if iter == 60:
314 logger.error(self.ifname + ": Driver scan state did not clear")
315 print("Trying to clear cfg80211/mac80211 scan state")
316 status, buf = self.host.execute(["ifconfig", self.ifname, "down"])
317 if status != 0:
318 logger.info("ifconfig failed: " + buf)
319 logger.info(status)
320 status, buf = self.host.execute(["ifconfig", self.ifname, "up"])
321 if status != 0:
322 logger.info("ifconfig failed: " + buf)
323 logger.info(status)
324 if iter > 0:
325 # The ongoing scan could have discovered BSSes or P2P peers
326 logger.info("Run FLUSH again since scan was in progress")
327 self.request("FLUSH")
328 self.dump_monitor()
329
330 if not self.ping():
331 logger.info("No PING response from " + self.ifname + " after reset")
332
333 def set(self, field, value):
334 if "OK" not in self.request("SET " + field + " " + value):
335 raise Exception("Failed to set wpa_supplicant parameter " + field)
336
337 def add_network(self):
338 id = self.request("ADD_NETWORK")
339 if "FAIL" in id:
340 raise Exception("ADD_NETWORK failed")
341 return int(id)
342
343 def remove_network(self, id):
344 id = self.request("REMOVE_NETWORK " + str(id))
345 if "FAIL" in id:
346 raise Exception("REMOVE_NETWORK failed")
347 return None
348
349 def get_network(self, id, field):
350 res = self.request("GET_NETWORK " + str(id) + " " + field)
351 if res == "FAIL\n":
352 return None
353 return res
354
355 def set_network(self, id, field, value):
356 res = self.request("SET_NETWORK " + str(id) + " " + field + " " + value)
357 if "FAIL" in res:
358 raise Exception("SET_NETWORK failed")
359 return None
360
361 def set_network_quoted(self, id, field, value):
362 res = self.request("SET_NETWORK " + str(id) + " " + field + ' "' + value + '"')
363 if "FAIL" in res:
364 raise Exception("SET_NETWORK failed")
365 return None
366
367 def p2pdev_request(self, cmd):
368 return self.global_request("IFNAME=" + self.p2p_dev_ifname + " " + cmd)
369
370 def p2pdev_add_network(self):
371 id = self.p2pdev_request("ADD_NETWORK")
372 if "FAIL" in id:
373 raise Exception("p2pdev ADD_NETWORK failed")
374 return int(id)
375
376 def p2pdev_set_network(self, id, field, value):
377 res = self.p2pdev_request("SET_NETWORK " + str(id) + " " + field + " " + value)
378 if "FAIL" in res:
379 raise Exception("p2pdev SET_NETWORK failed")
380 return None
381
382 def p2pdev_set_network_quoted(self, id, field, value):
383 res = self.p2pdev_request("SET_NETWORK " + str(id) + " " + field + ' "' + value + '"')
384 if "FAIL" in res:
385 raise Exception("p2pdev SET_NETWORK failed")
386 return None
387
388 def list_networks(self, p2p=False):
389 if p2p:
390 res = self.global_request("LIST_NETWORKS")
391 else:
392 res = self.request("LIST_NETWORKS")
393 lines = res.splitlines()
394 networks = []
395 for l in lines:
396 if "network id" in l:
397 continue
398 [id, ssid, bssid, flags] = l.split('\t')
399 network = {}
400 network['id'] = id
401 network['ssid'] = ssid
402 network['bssid'] = bssid
403 network['flags'] = flags
404 networks.append(network)
405 return networks
406
407 def hs20_enable(self, auto_interworking=False):
408 self.request("SET interworking 1")
409 self.request("SET hs20 1")
410 if auto_interworking:
411 self.request("SET auto_interworking 1")
412 else:
413 self.request("SET auto_interworking 0")
414
415 def interworking_add_network(self, bssid):
416 id = self.request("INTERWORKING_ADD_NETWORK " + bssid)
417 if "FAIL" in id or "OK" in id:
418 raise Exception("INTERWORKING_ADD_NETWORK failed")
419 return int(id)
420
421 def add_cred(self):
422 id = self.request("ADD_CRED")
423 if "FAIL" in id:
424 raise Exception("ADD_CRED failed")
425 return int(id)
426
427 def remove_cred(self, id):
428 id = self.request("REMOVE_CRED " + str(id))
429 if "FAIL" in id:
430 raise Exception("REMOVE_CRED failed")
431 return None
432
433 def set_cred(self, id, field, value):
434 res = self.request("SET_CRED " + str(id) + " " + field + " " + value)
435 if "FAIL" in res:
436 raise Exception("SET_CRED failed")
437 return None
438
439 def set_cred_quoted(self, id, field, value):
440 res = self.request("SET_CRED " + str(id) + " " + field + ' "' + value + '"')
441 if "FAIL" in res:
442 raise Exception("SET_CRED failed")
443 return None
444
445 def get_cred(self, id, field):
446 return self.request("GET_CRED " + str(id) + " " + field)
447
448 def add_cred_values(self, params):
449 id = self.add_cred()
450
451 quoted = ["realm", "username", "password", "domain", "imsi",
452 "excluded_ssid", "milenage", "ca_cert", "client_cert",
453 "private_key", "domain_suffix_match", "provisioning_sp",
454 "roaming_partner", "phase1", "phase2", "private_key_passwd",
455 "roaming_consortiums"]
456 for field in quoted:
457 if field in params:
458 self.set_cred_quoted(id, field, params[field])
459
460 not_quoted = ["eap", "roaming_consortium", "priority",
461 "required_roaming_consortium", "sp_priority",
462 "max_bss_load", "update_identifier", "req_conn_capab",
463 "min_dl_bandwidth_home", "min_ul_bandwidth_home",
464 "min_dl_bandwidth_roaming", "min_ul_bandwidth_roaming"]
465 for field in not_quoted:
466 if field in params:
467 self.set_cred(id, field, params[field])
468
469 return id
470
471 def select_network(self, id, freq=None):
472 if freq:
473 extra = " freq=" + str(freq)
474 else:
475 extra = ""
476 id = self.request("SELECT_NETWORK " + str(id) + extra)
477 if "FAIL" in id:
478 raise Exception("SELECT_NETWORK failed")
479 return None
480
481 def mesh_group_add(self, id):
482 id = self.request("MESH_GROUP_ADD " + str(id))
483 if "FAIL" in id:
484 raise Exception("MESH_GROUP_ADD failed")
485 return None
486
487 def mesh_group_remove(self):
488 id = self.request("MESH_GROUP_REMOVE " + str(self.ifname))
489 if "FAIL" in id:
490 raise Exception("MESH_GROUP_REMOVE failed")
491 return None
492
493 def connect_network(self, id, timeout=None):
494 if timeout is None:
495 timeout = 10 if self.hostname is None else 60
496 self.dump_monitor()
497 self.select_network(id)
498 self.wait_connected(timeout=timeout)
499 self.dump_monitor()
500
501 def get_status(self, extra=None):
502 if extra:
503 extra = "-" + extra
504 else:
505 extra = ""
506 res = self.request("STATUS" + extra)
507 lines = res.splitlines()
508 vals = dict()
509 for l in lines:
510 try:
511 [name, value] = l.split('=', 1)
512 vals[name] = value
513 except ValueError as e:
514 logger.info(self.ifname + ": Ignore unexpected STATUS line: " + l)
515 return vals
516
517 def get_status_field(self, field, extra=None):
518 vals = self.get_status(extra)
519 if field in vals:
520 return vals[field]
521 return None
522
523 def get_group_status(self, extra=None):
524 if extra:
525 extra = "-" + extra
526 else:
527 extra = ""
528 res = self.group_request("STATUS" + extra)
529 lines = res.splitlines()
530 vals = dict()
531 for l in lines:
532 try:
533 [name, value] = l.split('=', 1)
534 except ValueError:
535 logger.info(self.ifname + ": Ignore unexpected status line: " + l)
536 continue
537 vals[name] = value
538 return vals
539
540 def get_group_status_field(self, field, extra=None):
541 vals = self.get_group_status(extra)
542 if field in vals:
543 return vals[field]
544 return None
545
546 def get_driver_status(self, ifname=None):
547 if ifname is None:
548 res = self.request("STATUS-DRIVER")
549 else:
550 res = self.global_request("IFNAME=%s STATUS-DRIVER" % ifname)
551 if res.startswith("FAIL"):
552 return dict()
553 lines = res.splitlines()
554 vals = dict()
555 for l in lines:
556 try:
557 [name, value] = l.split('=', 1)
558 except ValueError:
559 logger.info(self.ifname + ": Ignore unexpected status-driver line: " + l)
560 continue
561 vals[name] = value
562 return vals
563
564 def get_driver_status_field(self, field, ifname=None):
565 vals = self.get_driver_status(ifname)
566 if field in vals:
567 return vals[field]
568 return None
569
570 def get_mcc(self):
571 mcc = int(self.get_driver_status_field('capa.num_multichan_concurrent'))
572 return 1 if mcc < 2 else mcc
573
574 def get_mib(self):
575 res = self.request("MIB")
576 lines = res.splitlines()
577 vals = dict()
578 for l in lines:
579 try:
580 [name, value] = l.split('=', 1)
581 vals[name] = value
582 except ValueError as e:
583 logger.info(self.ifname + ": Ignore unexpected MIB line: " + l)
584 return vals
585
586 def p2p_dev_addr(self):
587 return self.get_status_field("p2p_device_address")
588
589 def p2p_interface_addr(self):
590 return self.get_group_status_field("address")
591
592 def own_addr(self):
593 try:
594 res = self.p2p_interface_addr()
595 except:
596 res = self.p2p_dev_addr()
597 return res
598
599 def p2p_listen(self):
600 return self.global_request("P2P_LISTEN")
601
602 def p2p_ext_listen(self, period, interval):
603 return self.global_request("P2P_EXT_LISTEN %d %d" % (period, interval))
604
605 def p2p_cancel_ext_listen(self):
606 return self.global_request("P2P_EXT_LISTEN")
607
608 def p2p_find(self, social=False, progressive=False, dev_id=None,
609 dev_type=None, delay=None, freq=None):
610 cmd = "P2P_FIND"
611 if social:
612 cmd = cmd + " type=social"
613 elif progressive:
614 cmd = cmd + " type=progressive"
615 if dev_id:
616 cmd = cmd + " dev_id=" + dev_id
617 if dev_type:
618 cmd = cmd + " dev_type=" + dev_type
619 if delay:
620 cmd = cmd + " delay=" + str(delay)
621 if freq:
622 cmd = cmd + " freq=" + str(freq)
623 return self.global_request(cmd)
624
625 def p2p_stop_find(self):
626 return self.global_request("P2P_STOP_FIND")
627
628 def wps_read_pin(self):
629 self.pin = self.request("WPS_PIN get").rstrip("\n")
630 if "FAIL" in self.pin:
631 raise Exception("Could not generate PIN")
632 return self.pin
633
634 def peer_known(self, peer, full=True):
635 res = self.global_request("P2P_PEER " + peer)
636 if peer.lower() not in res.lower():
637 return False
638 if not full:
639 return True
640 return "[PROBE_REQ_ONLY]" not in res
641
642 def discover_peer(self, peer, full=True, timeout=15, social=True,
643 force_find=False, freq=None):
644 logger.info(self.ifname + ": Trying to discover peer " + peer)
645 if not force_find and self.peer_known(peer, full):
646 return True
647 self.p2p_find(social, freq=freq)
648 count = 0
649 while count < timeout * 4:
650 time.sleep(0.25)
651 count = count + 1
652 if self.peer_known(peer, full):
653 return True
654 return False
655
656 def get_peer(self, peer):
657 res = self.global_request("P2P_PEER " + peer)
658 if peer.lower() not in res.lower():
659 raise Exception("Peer information not available")
660 lines = res.splitlines()
661 vals = dict()
662 for l in lines:
663 if '=' in l:
664 [name, value] = l.split('=', 1)
665 vals[name] = value
666 return vals
667
668 def group_form_result(self, ev, expect_failure=False, go_neg_res=None):
669 if expect_failure:
670 if "P2P-GROUP-STARTED" in ev:
671 raise Exception("Group formation succeeded when expecting failure")
672 exp = r'<.>(P2P-GO-NEG-FAILURE) status=([0-9]*)'
673 s = re.split(exp, ev)
674 if len(s) < 3:
675 return None
676 res = {}
677 res['result'] = 'go-neg-failed'
678 res['status'] = int(s[2])
679 return res
680
681 if "P2P-GROUP-STARTED" not in ev:
682 raise Exception("No P2P-GROUP-STARTED event seen")
683
684 exp = r'<.>(P2P-GROUP-STARTED) ([^ ]*) ([^ ]*) ssid="(.*)" freq=([0-9]*) ((?:psk=.*)|(?:passphrase=".*")) go_dev_addr=([0-9a-f:]*) ip_addr=([0-9.]*) ip_mask=([0-9.]*) go_ip_addr=([0-9.]*)'
685 s = re.split(exp, ev)
686 if len(s) < 11:
687 exp = r'<.>(P2P-GROUP-STARTED) ([^ ]*) ([^ ]*) ssid="(.*)" freq=([0-9]*) ((?:psk=.*)|(?:passphrase=".*")) go_dev_addr=([0-9a-f:]*)'
688 s = re.split(exp, ev)
689 if len(s) < 8:
690 raise Exception("Could not parse P2P-GROUP-STARTED")
691 res = {}
692 res['result'] = 'success'
693 res['ifname'] = s[2]
694 self.group_ifname = s[2]
695 try:
696 if self.hostname is None:
697 self.gctrl_mon = wpaspy.Ctrl(os.path.join(wpas_ctrl,
698 self.group_ifname))
699 else:
700 port = self.get_ctrl_iface_port(self.group_ifname)
701 self.gctrl_mon = wpaspy.Ctrl(self.hostname, port)
702 if self.monitor:
703 self.gctrl_mon.attach()
704 except:
705 logger.debug("Could not open monitor socket for group interface")
706 self.gctrl_mon = None
707 res['role'] = s[3]
708 res['ssid'] = s[4]
709 res['freq'] = s[5]
710 if "[PERSISTENT]" in ev:
711 res['persistent'] = True
712 else:
713 res['persistent'] = False
714 p = re.match(r'psk=([0-9a-f]*)', s[6])
715 if p:
716 res['psk'] = p.group(1)
717 p = re.match(r'passphrase="(.*)"', s[6])
718 if p:
719 res['passphrase'] = p.group(1)
720 res['go_dev_addr'] = s[7]
721
722 if len(s) > 8 and len(s[8]) > 0 and "[PERSISTENT]" not in s[8]:
723 res['ip_addr'] = s[8]
724 if len(s) > 9:
725 res['ip_mask'] = s[9]
726 if len(s) > 10:
727 res['go_ip_addr'] = s[10]
728
729 if go_neg_res:
730 exp = r'<.>(P2P-GO-NEG-SUCCESS) role=(GO|client) freq=([0-9]*)'
731 s = re.split(exp, go_neg_res)
732 if len(s) < 4:
733 raise Exception("Could not parse P2P-GO-NEG-SUCCESS")
734 res['go_neg_role'] = s[2]
735 res['go_neg_freq'] = s[3]
736
737 return res
738
739 def p2p_go_neg_auth(self, peer, pin, method, go_intent=None,
740 persistent=False, freq=None, freq2=None,
741 max_oper_chwidth=None, ht40=False, vht=False):
742 if not self.discover_peer(peer):
743 raise Exception("Peer " + peer + " not found")
744 self.dump_monitor()
745 if pin:
746 cmd = "P2P_CONNECT " + peer + " " + pin + " " + method + " auth"
747 else:
748 cmd = "P2P_CONNECT " + peer + " " + method + " auth"
749 if go_intent:
750 cmd = cmd + ' go_intent=' + str(go_intent)
751 if freq:
752 cmd = cmd + ' freq=' + str(freq)
753 if freq2:
754 cmd = cmd + ' freq2=' + str(freq2)
755 if max_oper_chwidth:
756 cmd = cmd + ' max_oper_chwidth=' + str(max_oper_chwidth)
757 if ht40:
758 cmd = cmd + ' ht40'
759 if vht:
760 cmd = cmd + ' vht'
761 if persistent:
762 cmd = cmd + " persistent"
763 if "OK" in self.global_request(cmd):
764 return None
765 raise Exception("P2P_CONNECT (auth) failed")
766
767 def p2p_go_neg_auth_result(self, timeout=None, expect_failure=False):
768 if timeout is None:
769 timeout = 1 if expect_failure else 5
770 go_neg_res = None
771 ev = self.wait_global_event(["P2P-GO-NEG-SUCCESS",
772 "P2P-GO-NEG-FAILURE"], timeout)
773 if ev is None:
774 if expect_failure:
775 return None
776 raise Exception("Group formation timed out")
777 if "P2P-GO-NEG-SUCCESS" in ev:
778 go_neg_res = ev
779 ev = self.wait_global_event(["P2P-GROUP-STARTED"], timeout)
780 if ev is None:
781 if expect_failure:
782 return None
783 raise Exception("Group formation timed out")
784 self.dump_monitor()
785 return self.group_form_result(ev, expect_failure, go_neg_res)
786
787 def p2p_go_neg_init(self, peer, pin, method, timeout=0, go_intent=None,
788 expect_failure=False, persistent=False,
789 persistent_id=None, freq=None, provdisc=False,
790 wait_group=True, freq2=None, max_oper_chwidth=None,
791 ht40=False, vht=False):
792 if not self.discover_peer(peer):
793 raise Exception("Peer " + peer + " not found")
794 self.dump_monitor()
795 if pin:
796 cmd = "P2P_CONNECT " + peer + " " + pin + " " + method
797 else:
798 cmd = "P2P_CONNECT " + peer + " " + method
799 if go_intent is not None:
800 cmd = cmd + ' go_intent=' + str(go_intent)
801 if freq:
802 cmd = cmd + ' freq=' + str(freq)
803 if freq2:
804 cmd = cmd + ' freq2=' + str(freq2)
805 if max_oper_chwidth:
806 cmd = cmd + ' max_oper_chwidth=' + str(max_oper_chwidth)
807 if ht40:
808 cmd = cmd + ' ht40'
809 if vht:
810 cmd = cmd + ' vht'
811 if persistent:
812 cmd = cmd + " persistent"
813 elif persistent_id:
814 cmd = cmd + " persistent=" + persistent_id
815 if provdisc:
816 cmd = cmd + " provdisc"
817 if "OK" in self.global_request(cmd):
818 if timeout == 0:
819 return None
820 go_neg_res = None
821 ev = self.wait_global_event(["P2P-GO-NEG-SUCCESS",
822 "P2P-GO-NEG-FAILURE"], timeout)
823 if ev is None:
824 if expect_failure:
825 return None
826 raise Exception("Group formation timed out")
827 if "P2P-GO-NEG-SUCCESS" in ev:
828 if not wait_group:
829 return ev
830 go_neg_res = ev
831 ev = self.wait_global_event(["P2P-GROUP-STARTED"], timeout)
832 if ev is None:
833 if expect_failure:
834 return None
835 raise Exception("Group formation timed out")
836 self.dump_monitor()
837 return self.group_form_result(ev, expect_failure, go_neg_res)
838 raise Exception("P2P_CONNECT failed")
839
840 def _wait_event(self, mon, pfx, events, timeout):
841 if not isinstance(events, list):
842 raise Exception("WpaSupplicant._wait_event() called with incorrect events argument type")
843 start = os.times()[4]
844 while True:
845 while mon.pending():
846 ev = mon.recv()
847 logger.debug(self.dbg + pfx + ev)
848 for event in events:
849 if event in ev:
850 return ev
851 now = os.times()[4]
852 remaining = start + timeout - now
853 if remaining <= 0:
854 break
855 if not mon.pending(timeout=remaining):
856 break
857 return None
858
859 def wait_event(self, events, timeout=10):
860 return self._wait_event(self.mon, ": ", events, timeout)
861
862 def wait_global_event(self, events, timeout):
863 if self.global_iface is None:
864 return self.wait_event(events, timeout)
865 return self._wait_event(self.global_mon, "(global): ",
866 events, timeout)
867
868 def wait_group_event(self, events, timeout=10):
869 if not isinstance(events, list):
870 raise Exception("WpaSupplicant.wait_group_event() called with incorrect events argument type")
871 if self.group_ifname and self.group_ifname != self.ifname:
872 if self.gctrl_mon is None:
873 return None
874 start = os.times()[4]
875 while True:
876 while self.gctrl_mon.pending():
877 ev = self.gctrl_mon.recv()
878 logger.debug(self.group_dbg + "(group): " + ev)
879 for event in events:
880 if event in ev:
881 return ev
882 now = os.times()[4]
883 remaining = start + timeout - now
884 if remaining <= 0:
885 break
886 if not self.gctrl_mon.pending(timeout=remaining):
887 break
888 return None
889
890 return self.wait_event(events, timeout)
891
892 def wait_go_ending_session(self):
893 self.close_monitor_group()
894 timeout = 3 if self.hostname is None else 10
895 ev = self.wait_global_event(["P2P-GROUP-REMOVED"], timeout=timeout)
896 if ev is None:
897 raise Exception("Group removal event timed out")
898 if "reason=GO_ENDING_SESSION" not in ev:
899 raise Exception("Unexpected group removal reason")
900
901 def dump_monitor(self, mon=True, global_mon=True):
902 count_iface = 0
903 count_global = 0
904 while mon and self.monitor and self.mon.pending():
905 ev = self.mon.recv()
906 logger.debug(self.dbg + ": " + ev)
907 count_iface += 1
908 while global_mon and self.monitor and self.global_mon and self.global_mon.pending():
909 ev = self.global_mon.recv()
910 logger.debug(self.global_dbg + self.ifname + "(global): " + ev)
911 count_global += 1
912 return (count_iface, count_global)
913
914 def remove_group(self, ifname=None):
915 self.close_monitor_group()
916 if ifname is None:
917 ifname = self.group_ifname if self.group_ifname else self.ifname
918 if "OK" not in self.global_request("P2P_GROUP_REMOVE " + ifname):
919 raise Exception("Group could not be removed")
920 self.group_ifname = None
921
922 def p2p_start_go(self, persistent=None, freq=None, no_event_clear=False):
923 self.dump_monitor()
924 cmd = "P2P_GROUP_ADD"
925 if persistent is None:
926 pass
927 elif persistent is True:
928 cmd = cmd + " persistent"
929 else:
930 cmd = cmd + " persistent=" + str(persistent)
931 if freq:
932 cmd = cmd + " freq=" + str(freq)
933 if "OK" in self.global_request(cmd):
934 ev = self.wait_global_event(["P2P-GROUP-STARTED"], timeout=5)
935 if ev is None:
936 raise Exception("GO start up timed out")
937 if not no_event_clear:
938 self.dump_monitor()
939 return self.group_form_result(ev)
940 raise Exception("P2P_GROUP_ADD failed")
941
942 def p2p_go_authorize_client(self, pin):
943 cmd = "WPS_PIN any " + pin
944 if "FAIL" in self.group_request(cmd):
945 raise Exception("Failed to authorize client connection on GO")
946 return None
947
948 def p2p_go_authorize_client_pbc(self):
949 cmd = "WPS_PBC"
950 if "FAIL" in self.group_request(cmd):
951 raise Exception("Failed to authorize client connection on GO")
952 return None
953
954 def p2p_connect_group(self, go_addr, pin, timeout=0, social=False,
955 freq=None):
956 self.dump_monitor()
957 if not self.discover_peer(go_addr, social=social, freq=freq):
958 if social or not self.discover_peer(go_addr, social=social):
959 raise Exception("GO " + go_addr + " not found")
960 self.p2p_stop_find()
961 self.dump_monitor()
962 cmd = "P2P_CONNECT " + go_addr + " " + pin + " join"
963 if freq:
964 cmd += " freq=" + str(freq)
965 if "OK" in self.global_request(cmd):
966 if timeout == 0:
967 self.dump_monitor()
968 return None
969 ev = self.wait_global_event(["P2P-GROUP-STARTED",
970 "P2P-GROUP-FORMATION-FAILURE"],
971 timeout)
972 if ev is None:
973 raise Exception("Joining the group timed out")
974 if "P2P-GROUP-STARTED" not in ev:
975 raise Exception("Failed to join the group")
976 self.dump_monitor()
977 return self.group_form_result(ev)
978 raise Exception("P2P_CONNECT(join) failed")
979
980 def tdls_setup(self, peer):
981 cmd = "TDLS_SETUP " + peer
982 if "FAIL" in self.group_request(cmd):
983 raise Exception("Failed to request TDLS setup")
984 return None
985
986 def tdls_teardown(self, peer):
987 cmd = "TDLS_TEARDOWN " + peer
988 if "FAIL" in self.group_request(cmd):
989 raise Exception("Failed to request TDLS teardown")
990 return None
991
992 def tdls_link_status(self, peer):
993 cmd = "TDLS_LINK_STATUS " + peer
994 ret = self.group_request(cmd)
995 if "FAIL" in ret:
996 raise Exception("Failed to request TDLS link status")
997 return ret
998
999 def tspecs(self):
1000 """Return (tsid, up) tuples representing current tspecs"""
1001 res = self.request("WMM_AC_STATUS")
1002 tspecs = re.findall(r"TSID=(\d+) UP=(\d+)", res)
1003 tspecs = [tuple(map(int, tspec)) for tspec in tspecs]
1004
1005 logger.debug("tspecs: " + str(tspecs))
1006 return tspecs
1007
1008 def add_ts(self, tsid, up, direction="downlink", expect_failure=False,
1009 extra=None):
1010 params = {
1011 "sba": 9000,
1012 "nominal_msdu_size": 1500,
1013 "min_phy_rate": 6000000,
1014 "mean_data_rate": 1500,
1015 }
1016 cmd = "WMM_AC_ADDTS %s tsid=%d up=%d" % (direction, tsid, up)
1017 for (key, value) in params.items():
1018 cmd += " %s=%d" % (key, value)
1019 if extra:
1020 cmd += " " + extra
1021
1022 if self.request(cmd).strip() != "OK":
1023 raise Exception("ADDTS failed (tsid=%d up=%d)" % (tsid, up))
1024
1025 if expect_failure:
1026 ev = self.wait_event(["TSPEC-REQ-FAILED"], timeout=2)
1027 if ev is None:
1028 raise Exception("ADDTS failed (time out while waiting failure)")
1029 if "tsid=%d" % (tsid) not in ev:
1030 raise Exception("ADDTS failed (invalid tsid in TSPEC-REQ-FAILED")
1031 return
1032
1033 ev = self.wait_event(["TSPEC-ADDED"], timeout=1)
1034 if ev is None:
1035 raise Exception("ADDTS failed (time out)")
1036 if "tsid=%d" % (tsid) not in ev:
1037 raise Exception("ADDTS failed (invalid tsid in TSPEC-ADDED)")
1038
1039 if (tsid, up) not in self.tspecs():
1040 raise Exception("ADDTS failed (tsid not in tspec list)")
1041
1042 def del_ts(self, tsid):
1043 if self.request("WMM_AC_DELTS %d" % (tsid)).strip() != "OK":
1044 raise Exception("DELTS failed")
1045
1046 ev = self.wait_event(["TSPEC-REMOVED"], timeout=1)
1047 if ev is None:
1048 raise Exception("DELTS failed (time out)")
1049 if "tsid=%d" % (tsid) not in ev:
1050 raise Exception("DELTS failed (invalid tsid in TSPEC-REMOVED)")
1051
1052 tspecs = [(t, u) for (t, u) in self.tspecs() if t == tsid]
1053 if tspecs:
1054 raise Exception("DELTS failed (still in tspec list)")
1055
1056 def connect(self, ssid=None, ssid2=None, **kwargs):
1057 logger.info("Connect STA " + self.ifname + " to AP")
1058 id = self.add_network()
1059 if ssid:
1060 self.set_network_quoted(id, "ssid", ssid)
1061 elif ssid2:
1062 self.set_network(id, "ssid", ssid2)
1063
1064 quoted = ["psk", "identity", "anonymous_identity", "password",
1065 "machine_identity", "machine_password",
1066 "ca_cert", "client_cert", "private_key",
1067 "private_key_passwd", "ca_cert2", "client_cert2",
1068 "private_key2", "phase1", "phase2", "domain_suffix_match",
1069 "altsubject_match", "subject_match", "pac_file", "dh_file",
1070 "bgscan", "ht_mcs", "id_str", "openssl_ciphers",
1071 "domain_match", "dpp_connector", "sae_password",
1072 "sae_password_id", "check_cert_subject"]
1073 for field in quoted:
1074 if field in kwargs and kwargs[field]:
1075 self.set_network_quoted(id, field, kwargs[field])
1076
1077 not_quoted = ["proto", "key_mgmt", "ieee80211w", "pairwise",
1078 "group", "wep_key0", "wep_key1", "wep_key2", "wep_key3",
1079 "wep_tx_keyidx", "scan_freq", "freq_list", "eap",
1080 "eapol_flags", "fragment_size", "scan_ssid", "auth_alg",
1081 "wpa_ptk_rekey", "disable_ht", "disable_vht", "bssid",
1082 "disable_max_amsdu", "ampdu_factor", "ampdu_density",
1083 "disable_ht40", "disable_sgi", "disable_ldpc",
1084 "ht40_intolerant", "update_identifier", "mac_addr",
1085 "erp", "bg_scan_period", "bssid_blacklist",
1086 "bssid_whitelist", "mem_only_psk", "eap_workaround",
1087 "engine", "fils_dh_group", "bssid_hint",
1088 "dpp_csign", "dpp_csign_expiry",
1089 "dpp_netaccesskey", "dpp_netaccesskey_expiry",
1090 "group_mgmt", "owe_group",
1091 "roaming_consortium_selection", "ocv",
1092 "multi_ap_backhaul_sta", "rx_stbc", "tx_stbc",
1093 "ft_eap_pmksa_caching"]
1094 for field in not_quoted:
1095 if field in kwargs and kwargs[field]:
1096 self.set_network(id, field, kwargs[field])
1097
1098 if "raw_psk" in kwargs and kwargs['raw_psk']:
1099 self.set_network(id, "psk", kwargs['raw_psk'])
1100 if "password_hex" in kwargs and kwargs['password_hex']:
1101 self.set_network(id, "password", kwargs['password_hex'])
1102 if "peerkey" in kwargs and kwargs['peerkey']:
1103 self.set_network(id, "peerkey", "1")
1104 if "okc" in kwargs and kwargs['okc']:
1105 self.set_network(id, "proactive_key_caching", "1")
1106 if "ocsp" in kwargs and kwargs['ocsp']:
1107 self.set_network(id, "ocsp", str(kwargs['ocsp']))
1108 if "only_add_network" in kwargs and kwargs['only_add_network']:
1109 return id
1110 if "wait_connect" not in kwargs or kwargs['wait_connect']:
1111 if "eap" in kwargs:
1112 self.connect_network(id, timeout=20)
1113 else:
1114 self.connect_network(id)
1115 else:
1116 self.dump_monitor()
1117 self.select_network(id)
1118 return id
1119
1120 def scan(self, type=None, freq=None, no_wait=False, only_new=False,
1121 passive=False):
1122 if not no_wait:
1123 self.dump_monitor()
1124 if type:
1125 cmd = "SCAN TYPE=" + type
1126 else:
1127 cmd = "SCAN"
1128 if freq:
1129 cmd = cmd + " freq=" + str(freq)
1130 if only_new:
1131 cmd += " only_new=1"
1132 if passive:
1133 cmd += " passive=1"
1134 if not no_wait:
1135 self.dump_monitor()
1136 res = self.request(cmd)
1137 if "OK" not in res:
1138 raise Exception("Failed to trigger scan: " + str(res))
1139 if no_wait:
1140 return
1141 ev = self.wait_event(["CTRL-EVENT-SCAN-RESULTS",
1142 "CTRL-EVENT-SCAN-FAILED"], 15)
1143 if ev is None:
1144 raise Exception("Scan timed out")
1145 if "CTRL-EVENT-SCAN-FAILED" in ev:
1146 raise Exception("Scan failed: " + ev)
1147
1148 def scan_for_bss(self, bssid, freq=None, force_scan=False, only_new=False,
1149 passive=False):
1150 if not force_scan and self.get_bss(bssid) is not None:
1151 return
1152 for i in range(0, 10):
1153 self.scan(freq=freq, type="ONLY", only_new=only_new,
1154 passive=passive)
1155 if self.get_bss(bssid) is not None:
1156 return
1157 raise Exception("Could not find BSS " + bssid + " in scan")
1158
1159 def flush_scan_cache(self, freq=2417):
1160 self.request("BSS_FLUSH 0")
1161 self.scan(freq=freq, only_new=True)
1162 res = self.request("SCAN_RESULTS")
1163 if len(res.splitlines()) > 1:
1164 logger.debug("Scan results remaining after first attempt to flush the results:\n" + res)
1165 self.request("BSS_FLUSH 0")
1166 self.scan(freq=2422, only_new=True)
1167 res = self.request("SCAN_RESULTS")
1168 if len(res.splitlines()) > 1:
1169 logger.info("flush_scan_cache: Could not clear all BSS entries. These remain:\n" + res)
1170
1171 def disconnect_and_stop_scan(self):
1172 self.request("DISCONNECT")
1173 res = self.request("ABORT_SCAN")
1174 for i in range(2 if "OK" in res else 1):
1175 self.wait_event(["CTRL-EVENT-DISCONNECTED",
1176 "CTRL-EVENT-SCAN-RESULTS"], timeout=0.5)
1177 self.dump_monitor()
1178
1179 def roam(self, bssid, fail_test=False, assoc_reject_ok=False):
1180 self.dump_monitor()
1181 if "OK" not in self.request("ROAM " + bssid):
1182 raise Exception("ROAM failed")
1183 if fail_test:
1184 if assoc_reject_ok:
1185 ev = self.wait_event(["CTRL-EVENT-CONNECTED",
1186 "CTRL-EVENT-ASSOC-REJECT"], timeout=1)
1187 else:
1188 ev = self.wait_event(["CTRL-EVENT-CONNECTED"], timeout=1)
1189 if ev is not None and "CTRL-EVENT-ASSOC-REJECT" not in ev:
1190 raise Exception("Unexpected connection")
1191 self.dump_monitor()
1192 return
1193 ev = self.wait_event(["CTRL-EVENT-CONNECTED",
1194 "CTRL-EVENT-ASSOC-REJECT"], timeout=10)
1195 if ev is None:
1196 raise Exception("Roaming with the AP timed out")
1197 if "CTRL-EVENT-ASSOC-REJECT" in ev:
1198 raise Exception("Roaming association rejected")
1199 self.dump_monitor()
1200
1201 def roam_over_ds(self, bssid, fail_test=False):
1202 self.dump_monitor()
1203 if "OK" not in self.request("FT_DS " + bssid):
1204 raise Exception("FT_DS failed")
1205 if fail_test:
1206 ev = self.wait_event(["CTRL-EVENT-CONNECTED"], timeout=1)
1207 if ev is not None:
1208 raise Exception("Unexpected connection")
1209 self.dump_monitor()
1210 return
1211 ev = self.wait_event(["CTRL-EVENT-CONNECTED",
1212 "CTRL-EVENT-ASSOC-REJECT"], timeout=10)
1213 if ev is None:
1214 raise Exception("Roaming with the AP timed out")
1215 if "CTRL-EVENT-ASSOC-REJECT" in ev:
1216 raise Exception("Roaming association rejected")
1217 self.dump_monitor()
1218
1219 def wps_reg(self, bssid, pin, new_ssid=None, key_mgmt=None, cipher=None,
1220 new_passphrase=None, no_wait=False):
1221 self.dump_monitor()
1222 if new_ssid:
1223 self.request("WPS_REG " + bssid + " " + pin + " " +
1224 binascii.hexlify(new_ssid.encode()).decode() + " " +
1225 key_mgmt + " " + cipher + " " +
1226 binascii.hexlify(new_passphrase.encode()).decode())
1227 if no_wait:
1228 return
1229 ev = self.wait_event(["WPS-SUCCESS"], timeout=15)
1230 else:
1231 self.request("WPS_REG " + bssid + " " + pin)
1232 if no_wait:
1233 return
1234 ev = self.wait_event(["WPS-CRED-RECEIVED"], timeout=15)
1235 if ev is None:
1236 raise Exception("WPS cred timed out")
1237 ev = self.wait_event(["WPS-FAIL"], timeout=15)
1238 if ev is None:
1239 raise Exception("WPS timed out")
1240 self.wait_connected(timeout=15)
1241
1242 def relog(self):
1243 self.global_request("RELOG")
1244
1245 def wait_completed(self, timeout=10):
1246 for i in range(0, timeout * 2):
1247 if self.get_status_field("wpa_state") == "COMPLETED":
1248 return
1249 time.sleep(0.5)
1250 raise Exception("Timeout while waiting for COMPLETED state")
1251
1252 def get_capability(self, field):
1253 res = self.request("GET_CAPABILITY " + field)
1254 if "FAIL" in res:
1255 return None
1256 return res.split(' ')
1257
1258 def get_bss(self, bssid, ifname=None):
1259 if not ifname or ifname == self.ifname:
1260 res = self.request("BSS " + bssid)
1261 elif ifname == self.group_ifname:
1262 res = self.group_request("BSS " + bssid)
1263 else:
1264 return None
1265
1266 if "FAIL" in res:
1267 return None
1268 lines = res.splitlines()
1269 vals = dict()
1270 for l in lines:
1271 [name, value] = l.split('=', 1)
1272 vals[name] = value
1273 if len(vals) == 0:
1274 return None
1275 return vals
1276
1277 def get_pmksa(self, bssid):
1278 res = self.request("PMKSA")
1279 lines = res.splitlines()
1280 for l in lines:
1281 if bssid not in l:
1282 continue
1283 vals = dict()
1284 try:
1285 [index, aa, pmkid, expiration, opportunistic] = l.split(' ')
1286 cache_id = None
1287 except ValueError:
1288 [index, aa, pmkid, expiration, opportunistic, cache_id] = l.split(' ')
1289 vals['index'] = index
1290 vals['pmkid'] = pmkid
1291 vals['expiration'] = expiration
1292 vals['opportunistic'] = opportunistic
1293 if cache_id != None:
1294 vals['cache_id'] = cache_id
1295 return vals
1296 return None
1297
1298 def get_sta(self, addr, info=None, next=False):
1299 cmd = "STA-NEXT " if next else "STA "
1300 if addr is None:
1301 res = self.request("STA-FIRST")
1302 elif info:
1303 res = self.request(cmd + addr + " " + info)
1304 else:
1305 res = self.request(cmd + addr)
1306 lines = res.splitlines()
1307 vals = dict()
1308 first = True
1309 for l in lines:
1310 if first:
1311 vals['addr'] = l
1312 first = False
1313 else:
1314 [name, value] = l.split('=', 1)
1315 vals[name] = value
1316 return vals
1317
1318 def mgmt_rx(self, timeout=5):
1319 ev = self.wait_event(["MGMT-RX"], timeout=timeout)
1320 if ev is None:
1321 return None
1322 msg = {}
1323 items = ev.split(' ')
1324 field, val = items[1].split('=')
1325 if field != "freq":
1326 raise Exception("Unexpected MGMT-RX event format: " + ev)
1327 msg['freq'] = val
1328
1329 field, val = items[2].split('=')
1330 if field != "datarate":
1331 raise Exception("Unexpected MGMT-RX event format: " + ev)
1332 msg['datarate'] = val
1333
1334 field, val = items[3].split('=')
1335 if field != "ssi_signal":
1336 raise Exception("Unexpected MGMT-RX event format: " + ev)
1337 msg['ssi_signal'] = val
1338
1339 frame = binascii.unhexlify(items[4])
1340 msg['frame'] = frame
1341
1342 hdr = struct.unpack('<HH6B6B6BH', frame[0:24])
1343 msg['fc'] = hdr[0]
1344 msg['subtype'] = (hdr[0] >> 4) & 0xf
1345 hdr = hdr[1:]
1346 msg['duration'] = hdr[0]
1347 hdr = hdr[1:]
1348 msg['da'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
1349 hdr = hdr[6:]
1350 msg['sa'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
1351 hdr = hdr[6:]
1352 msg['bssid'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
1353 hdr = hdr[6:]
1354 msg['seq_ctrl'] = hdr[0]
1355 msg['payload'] = frame[24:]
1356
1357 return msg
1358
1359 def wait_connected(self, timeout=10, error="Connection timed out"):
1360 ev = self.wait_event(["CTRL-EVENT-CONNECTED"], timeout=timeout)
1361 if ev is None:
1362 raise Exception(error)
1363 return ev
1364
1365 def wait_disconnected(self, timeout=None, error="Disconnection timed out"):
1366 if timeout is None:
1367 timeout = 10 if self.hostname is None else 30
1368 ev = self.wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=timeout)
1369 if ev is None:
1370 raise Exception(error)
1371 return ev
1372
1373 def get_group_ifname(self):
1374 return self.group_ifname if self.group_ifname else self.ifname
1375
1376 def get_config(self):
1377 res = self.request("DUMP")
1378 if res.startswith("FAIL"):
1379 raise Exception("DUMP failed")
1380 lines = res.splitlines()
1381 vals = dict()
1382 for l in lines:
1383 [name, value] = l.split('=', 1)
1384 vals[name] = value
1385 return vals
1386
1387 def asp_provision(self, peer, adv_id, adv_mac, session_id, session_mac,
1388 method="1000", info="", status=None, cpt=None, role=None):
1389 if status is None:
1390 cmd = "P2P_ASP_PROVISION"
1391 params = "info='%s' method=%s" % (info, method)
1392 else:
1393 cmd = "P2P_ASP_PROVISION_RESP"
1394 params = "status=%d" % status
1395
1396 if role is not None:
1397 params += " role=" + role
1398 if cpt is not None:
1399 params += " cpt=" + cpt
1400
1401 if "OK" not in self.global_request("%s %s adv_id=%s adv_mac=%s session=%d session_mac=%s %s" %
1402 (cmd, peer, adv_id, adv_mac, session_id, session_mac, params)):
1403 raise Exception("%s request failed" % cmd)
1404
1405 def note(self, txt):
1406 self.request("NOTE " + txt)
1407
1408 def wait_regdom(self, country_ie=False):
1409 for i in range(5):
1410 ev = self.wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
1411 if ev is None:
1412 break
1413 if country_ie:
1414 if "init=COUNTRY_IE" in ev:
1415 break
1416 else:
1417 break
1418
1419 def dpp_qr_code(self, uri):
1420 res = self.request("DPP_QR_CODE " + uri)
1421 if "FAIL" in res:
1422 raise Exception("Failed to parse QR Code URI")
1423 return int(res)
1424
1425 def dpp_bootstrap_gen(self, type="qrcode", chan=None, mac=None, info=None,
1426 curve=None, key=None):
1427 cmd = "DPP_BOOTSTRAP_GEN type=" + type
1428 if chan:
1429 cmd += " chan=" + chan
1430 if mac:
1431 if mac is True:
1432 mac = self.own_addr()
1433 cmd += " mac=" + mac.replace(':', '')
1434 if info:
1435 cmd += " info=" + info
1436 if curve:
1437 cmd += " curve=" + curve
1438 if key:
1439 cmd += " key=" + key
1440 res = self.request(cmd)
1441 if "FAIL" in res:
1442 raise Exception("Failed to generate bootstrapping info")
1443 return int(res)
1444
1445 def dpp_listen(self, freq, netrole=None, qr=None, role=None):
1446 cmd = "DPP_LISTEN " + str(freq)
1447 if netrole:
1448 cmd += " netrole=" + netrole
1449 if qr:
1450 cmd += " qr=" + qr
1451 if role:
1452 cmd += " role=" + role
1453 if "OK" not in self.request(cmd):
1454 raise Exception("Failed to start listen operation")
1455
1456 def dpp_auth_init(self, peer=None, uri=None, conf=None, configurator=None,
1457 extra=None, own=None, role=None, neg_freq=None,
1458 ssid=None, passphrase=None, expect_fail=False,
1459 tcp_addr=None, tcp_port=None):
1460 cmd = "DPP_AUTH_INIT"
1461 if peer is None:
1462 peer = self.dpp_qr_code(uri)
1463 cmd += " peer=%d" % peer
1464 if own is not None:
1465 cmd += " own=%d" % own
1466 if role:
1467 cmd += " role=" + role
1468 if extra:
1469 cmd += " " + extra
1470 if conf:
1471 cmd += " conf=" + conf
1472 if configurator is not None:
1473 cmd += " configurator=%d" % configurator
1474 if neg_freq:
1475 cmd += " neg_freq=%d" % neg_freq
1476 if ssid:
1477 cmd += " ssid=" + binascii.hexlify(ssid.encode()).decode()
1478 if passphrase:
1479 cmd += " pass=" + binascii.hexlify(passphrase.encode()).decode()
1480 if tcp_addr:
1481 cmd += " tcp_addr=" + tcp_addr
1482 if tcp_port:
1483 cmd += " tcp_port=" + tcp_port
1484 res = self.request(cmd)
1485 if expect_fail:
1486 if "FAIL" not in res:
1487 raise Exception("DPP authentication started unexpectedly")
1488 return
1489 if "OK" not in res:
1490 raise Exception("Failed to initiate DPP Authentication")
1491
1492 def dpp_pkex_init(self, identifier, code, role=None, key=None, curve=None,
1493 extra=None, use_id=None, allow_fail=False):
1494 if use_id is None:
1495 id1 = self.dpp_bootstrap_gen(type="pkex", key=key, curve=curve)
1496 else:
1497 id1 = use_id
1498 cmd = "own=%d " % id1
1499 if identifier:
1500 cmd += "identifier=%s " % identifier
1501 cmd += "init=1 "
1502 if role:
1503 cmd += "role=%s " % role
1504 if extra:
1505 cmd += extra + " "
1506 cmd += "code=%s" % code
1507 res = self.request("DPP_PKEX_ADD " + cmd)
1508 if allow_fail:
1509 return id1
1510 if "FAIL" in res:
1511 raise Exception("Failed to set PKEX data (initiator)")
1512 return id1
1513
1514 def dpp_pkex_resp(self, freq, identifier, code, key=None, curve=None,
1515 listen_role=None, use_id=None):
1516 if use_id is None:
1517 id0 = self.dpp_bootstrap_gen(type="pkex", key=key, curve=curve)
1518 else:
1519 id0 = use_id
1520 cmd = "own=%d " % id0
1521 if identifier:
1522 cmd += "identifier=%s " % identifier
1523 cmd += "code=%s" % code
1524 res = self.request("DPP_PKEX_ADD " + cmd)
1525 if "FAIL" in res:
1526 raise Exception("Failed to set PKEX data (responder)")
1527 self.dpp_listen(freq, role=listen_role)
1528 return id0
1529
1530 def dpp_configurator_add(self, curve=None, key=None):
1531 cmd = "DPP_CONFIGURATOR_ADD"
1532 if curve:
1533 cmd += " curve=" + curve
1534 if key:
1535 cmd += " key=" + key
1536 res = self.request(cmd)
1537 if "FAIL" in res:
1538 raise Exception("Failed to add configurator")
1539 return int(res)
1540
1541 def dpp_configurator_remove(self, conf_id):
1542 res = self.request("DPP_CONFIGURATOR_REMOVE %d" % conf_id)
1543 if "OK" not in res:
1544 raise Exception("DPP_CONFIGURATOR_REMOVE failed")