]> git.ipfire.org Git - thirdparty/hostap.git/blob - tests/hwsim/wpasupplicant.py
tests: Use a helper function for DPP_QR_CODE commands
[thirdparty/hostap.git] / tests / hwsim / wpasupplicant.py
1 # Python class for controlling wpa_supplicant
2 # Copyright (c) 2013-2019, Jouni Malinen <j@w1.fi>
3 #
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
6
7 import os
8 import time
9 import logging
10 import binascii
11 import re
12 import struct
13 import wpaspy
14 import remotehost
15 import subprocess
16
17 logger = logging.getLogger()
18 wpas_ctrl = '/var/run/wpa_supplicant'
19
20 class WpaSupplicant:
21 def __init__(self, ifname=None, global_iface=None, hostname=None,
22 port=9877, global_port=9878, monitor=True):
23 self.monitor = monitor
24 self.hostname = hostname
25 self.group_ifname = None
26 self.gctrl_mon = None
27 self.host = remotehost.Host(hostname, ifname)
28 self._group_dbg = None
29 if ifname:
30 self.set_ifname(ifname, hostname, port)
31 res = self.get_driver_status()
32 if 'capa.flags' in res and int(res['capa.flags'], 0) & 0x20000000:
33 self.p2p_dev_ifname = 'p2p-dev-' + self.ifname
34 else:
35 self.p2p_dev_ifname = ifname
36 else:
37 self.ifname = None
38
39 self.global_iface = global_iface
40 if global_iface:
41 self.global_mon = None
42 if hostname != None:
43 self.global_ctrl = wpaspy.Ctrl(hostname, global_port)
44 if self.monitor:
45 self.global_mon = wpaspy.Ctrl(hostname, global_port)
46 self.global_dbg = hostname + "/" + str(global_port) + "/"
47 else:
48 self.global_ctrl = wpaspy.Ctrl(global_iface)
49 if self.monitor:
50 self.global_mon = wpaspy.Ctrl(global_iface)
51 self.global_dbg = ""
52 if self.monitor:
53 self.global_mon.attach()
54 else:
55 self.global_mon = None
56
57 def cmd_execute(self, cmd_array, shell=False):
58 if self.hostname is None:
59 if shell:
60 cmd = ' '.join(cmd_array)
61 else:
62 cmd = cmd_array
63 proc = subprocess.Popen(cmd, stderr=subprocess.STDOUT,
64 stdout=subprocess.PIPE, shell=shell)
65 out = proc.communicate()[0]
66 ret = proc.returncode
67 return ret, out.decode()
68 else:
69 return self.host.execute(cmd_array)
70
71 def terminate(self):
72 if self.global_mon:
73 self.global_mon.detach()
74 self.global_mon = None
75 self.global_ctrl.terminate()
76 self.global_ctrl = None
77
78 def close_ctrl(self):
79 if self.global_mon:
80 self.global_mon.detach()
81 self.global_mon = None
82 self.global_ctrl = None
83 self.remove_ifname()
84
85 def set_ifname(self, ifname, hostname=None, port=9877):
86 self.ifname = ifname
87 if hostname != None:
88 self.ctrl = wpaspy.Ctrl(hostname, port)
89 if self.monitor:
90 self.mon = wpaspy.Ctrl(hostname, port)
91 self.host = remotehost.Host(hostname, ifname)
92 self.dbg = hostname + "/" + ifname
93 else:
94 self.ctrl = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname))
95 if self.monitor:
96 self.mon = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname))
97 self.dbg = ifname
98 if self.monitor:
99 self.mon.attach()
100
101 def remove_ifname(self):
102 if self.ifname:
103 self.mon.detach()
104 self.mon = None
105 self.ctrl = None
106 self.ifname = None
107
108 def get_ctrl_iface_port(self, ifname):
109 if self.hostname is None:
110 return None
111
112 res = self.global_request("INTERFACES ctrl")
113 lines = res.splitlines()
114 found = False
115 for line in lines:
116 words = line.split()
117 if words[0] == ifname:
118 found = True
119 break
120 if not found:
121 raise Exception("Could not find UDP port for " + ifname)
122 res = line.find("ctrl_iface=udp:")
123 if res == -1:
124 raise Exception("Wrong ctrl_interface format")
125 words = line.split(":")
126 return int(words[1])
127
128 def interface_add(self, ifname, config="", driver="nl80211",
129 drv_params=None, br_ifname=None, create=False,
130 set_ifname=True, all_params=False, if_type=None):
131 status, groups = self.host.execute(["id"])
132 if status != 0:
133 group = "admin"
134 group = "admin" if "(admin)" in groups else "adm"
135 cmd = "INTERFACE_ADD " + ifname + "\t" + config + "\t" + driver + "\tDIR=/var/run/wpa_supplicant GROUP=" + group
136 if drv_params:
137 cmd = cmd + '\t' + drv_params
138 if br_ifname:
139 if not drv_params:
140 cmd += '\t'
141 cmd += '\t' + br_ifname
142 if create:
143 if not br_ifname:
144 cmd += '\t'
145 if not drv_params:
146 cmd += '\t'
147 cmd += '\tcreate'
148 if if_type:
149 cmd += '\t' + if_type
150 if all_params and not create:
151 if not br_ifname:
152 cmd += '\t'
153 if not drv_params:
154 cmd += '\t'
155 cmd += '\t'
156 if "FAIL" in self.global_request(cmd):
157 raise Exception("Failed to add a dynamic wpa_supplicant interface")
158 if not create and set_ifname:
159 port = self.get_ctrl_iface_port(ifname)
160 self.set_ifname(ifname, self.hostname, port)
161 res = self.get_driver_status()
162 if 'capa.flags' in res and int(res['capa.flags'], 0) & 0x20000000:
163 self.p2p_dev_ifname = 'p2p-dev-' + self.ifname
164 else:
165 self.p2p_dev_ifname = ifname
166
167 def interface_remove(self, ifname):
168 self.remove_ifname()
169 self.global_request("INTERFACE_REMOVE " + ifname)
170
171 def request(self, cmd, timeout=10):
172 logger.debug(self.dbg + ": CTRL: " + cmd)
173 return self.ctrl.request(cmd, timeout=timeout)
174
175 def global_request(self, cmd):
176 if self.global_iface is None:
177 return self.request(cmd)
178 else:
179 ifname = self.ifname or self.global_iface
180 logger.debug(self.global_dbg + ifname + ": CTRL(global): " + cmd)
181 return self.global_ctrl.request(cmd)
182
183 @property
184 def group_dbg(self):
185 if self._group_dbg is not None:
186 return self._group_dbg
187 if self.group_ifname is None:
188 raise Exception("Cannot have group_dbg without group_ifname")
189 if self.hostname is None:
190 self._group_dbg = self.group_ifname
191 else:
192 self._group_dbg = self.hostname + "/" + self.group_ifname
193 return self._group_dbg
194
195 def group_request(self, cmd):
196 if self.group_ifname and self.group_ifname != self.ifname:
197 if self.hostname is None:
198 gctrl = wpaspy.Ctrl(os.path.join(wpas_ctrl, self.group_ifname))
199 else:
200 port = self.get_ctrl_iface_port(self.group_ifname)
201 gctrl = wpaspy.Ctrl(self.hostname, port)
202 logger.debug(self.group_dbg + ": CTRL(group): " + cmd)
203 return gctrl.request(cmd)
204 return self.request(cmd)
205
206 def ping(self):
207 return "PONG" in self.request("PING")
208
209 def global_ping(self):
210 return "PONG" in self.global_request("PING")
211
212 def reset(self):
213 self.dump_monitor()
214 res = self.request("FLUSH")
215 if not "OK" in res:
216 logger.info("FLUSH to " + self.ifname + " failed: " + res)
217 self.global_request("REMOVE_NETWORK all")
218 self.global_request("SET p2p_no_group_iface 1")
219 self.global_request("P2P_FLUSH")
220 if self.gctrl_mon:
221 try:
222 self.gctrl_mon.detach()
223 except:
224 pass
225 self.gctrl_mon = None
226 self.group_ifname = None
227 self.dump_monitor()
228
229 iter = 0
230 while iter < 60:
231 state1 = self.get_driver_status_field("scan_state")
232 p2pdev = "p2p-dev-" + self.ifname
233 state2 = self.get_driver_status_field("scan_state", ifname=p2pdev)
234 states = str(state1) + " " + str(state2)
235 if "SCAN_STARTED" in states or "SCAN_REQUESTED" in states:
236 logger.info(self.ifname + ": Waiting for scan operation to complete before continuing")
237 time.sleep(1)
238 else:
239 break
240 iter = iter + 1
241 if iter == 60:
242 logger.error(self.ifname + ": Driver scan state did not clear")
243 print("Trying to clear cfg80211/mac80211 scan state")
244 status, buf = self.host.execute(["ifconfig", self.ifname, "down"])
245 if status != 0:
246 logger.info("ifconfig failed: " + buf)
247 logger.info(status)
248 status, buf = self.host.execute(["ifconfig", self.ifname, "up"])
249 if status != 0:
250 logger.info("ifconfig failed: " + buf)
251 logger.info(status)
252 if iter > 0:
253 # The ongoing scan could have discovered BSSes or P2P peers
254 logger.info("Run FLUSH again since scan was in progress")
255 self.request("FLUSH")
256 self.dump_monitor()
257
258 if not self.ping():
259 logger.info("No PING response from " + self.ifname + " after reset")
260
261 def set(self, field, value):
262 if not "OK" in self.request("SET " + field + " " + value):
263 raise Exception("Failed to set wpa_supplicant parameter " + field)
264
265 def add_network(self):
266 id = self.request("ADD_NETWORK")
267 if "FAIL" in id:
268 raise Exception("ADD_NETWORK failed")
269 return int(id)
270
271 def remove_network(self, id):
272 id = self.request("REMOVE_NETWORK " + str(id))
273 if "FAIL" in id:
274 raise Exception("REMOVE_NETWORK failed")
275 return None
276
277 def get_network(self, id, field):
278 res = self.request("GET_NETWORK " + str(id) + " " + field)
279 if res == "FAIL\n":
280 return None
281 return res
282
283 def set_network(self, id, field, value):
284 res = self.request("SET_NETWORK " + str(id) + " " + field + " " + value)
285 if "FAIL" in res:
286 raise Exception("SET_NETWORK failed")
287 return None
288
289 def set_network_quoted(self, id, field, value):
290 res = self.request("SET_NETWORK " + str(id) + " " + field + ' "' + value + '"')
291 if "FAIL" in res:
292 raise Exception("SET_NETWORK failed")
293 return None
294
295 def p2pdev_request(self, cmd):
296 return self.global_request("IFNAME=" + self.p2p_dev_ifname + " " + cmd)
297
298 def p2pdev_add_network(self):
299 id = self.p2pdev_request("ADD_NETWORK")
300 if "FAIL" in id:
301 raise Exception("p2pdev ADD_NETWORK failed")
302 return int(id)
303
304 def p2pdev_set_network(self, id, field, value):
305 res = self.p2pdev_request("SET_NETWORK " + str(id) + " " + field + " " + value)
306 if "FAIL" in res:
307 raise Exception("p2pdev SET_NETWORK failed")
308 return None
309
310 def p2pdev_set_network_quoted(self, id, field, value):
311 res = self.p2pdev_request("SET_NETWORK " + str(id) + " " + field + ' "' + value + '"')
312 if "FAIL" in res:
313 raise Exception("p2pdev SET_NETWORK failed")
314 return None
315
316 def list_networks(self, p2p=False):
317 if p2p:
318 res = self.global_request("LIST_NETWORKS")
319 else:
320 res = self.request("LIST_NETWORKS")
321 lines = res.splitlines()
322 networks = []
323 for l in lines:
324 if "network id" in l:
325 continue
326 [id,ssid,bssid,flags] = l.split('\t')
327 network = {}
328 network['id'] = id
329 network['ssid'] = ssid
330 network['bssid'] = bssid
331 network['flags'] = flags
332 networks.append(network)
333 return networks
334
335 def hs20_enable(self, auto_interworking=False):
336 self.request("SET interworking 1")
337 self.request("SET hs20 1")
338 if auto_interworking:
339 self.request("SET auto_interworking 1")
340 else:
341 self.request("SET auto_interworking 0")
342
343 def interworking_add_network(self, bssid):
344 id = self.request("INTERWORKING_ADD_NETWORK " + bssid)
345 if "FAIL" in id or "OK" in id:
346 raise Exception("INTERWORKING_ADD_NETWORK failed")
347 return int(id)
348
349 def add_cred(self):
350 id = self.request("ADD_CRED")
351 if "FAIL" in id:
352 raise Exception("ADD_CRED failed")
353 return int(id)
354
355 def remove_cred(self, id):
356 id = self.request("REMOVE_CRED " + str(id))
357 if "FAIL" in id:
358 raise Exception("REMOVE_CRED failed")
359 return None
360
361 def set_cred(self, id, field, value):
362 res = self.request("SET_CRED " + str(id) + " " + field + " " + value)
363 if "FAIL" in res:
364 raise Exception("SET_CRED failed")
365 return None
366
367 def set_cred_quoted(self, id, field, value):
368 res = self.request("SET_CRED " + str(id) + " " + field + ' "' + value + '"')
369 if "FAIL" in res:
370 raise Exception("SET_CRED failed")
371 return None
372
373 def get_cred(self, id, field):
374 return self.request("GET_CRED " + str(id) + " " + field)
375
376 def add_cred_values(self, params):
377 id = self.add_cred()
378
379 quoted = [ "realm", "username", "password", "domain", "imsi",
380 "excluded_ssid", "milenage", "ca_cert", "client_cert",
381 "private_key", "domain_suffix_match", "provisioning_sp",
382 "roaming_partner", "phase1", "phase2", "private_key_passwd",
383 "roaming_consortiums" ]
384 for field in quoted:
385 if field in params:
386 self.set_cred_quoted(id, field, params[field])
387
388 not_quoted = [ "eap", "roaming_consortium", "priority",
389 "required_roaming_consortium", "sp_priority",
390 "max_bss_load", "update_identifier", "req_conn_capab",
391 "min_dl_bandwidth_home", "min_ul_bandwidth_home",
392 "min_dl_bandwidth_roaming", "min_ul_bandwidth_roaming" ]
393 for field in not_quoted:
394 if field in params:
395 self.set_cred(id, field, params[field])
396
397 return id
398
399 def select_network(self, id, freq=None):
400 if freq:
401 extra = " freq=" + str(freq)
402 else:
403 extra = ""
404 id = self.request("SELECT_NETWORK " + str(id) + extra)
405 if "FAIL" in id:
406 raise Exception("SELECT_NETWORK failed")
407 return None
408
409 def mesh_group_add(self, id):
410 id = self.request("MESH_GROUP_ADD " + str(id))
411 if "FAIL" in id:
412 raise Exception("MESH_GROUP_ADD failed")
413 return None
414
415 def mesh_group_remove(self):
416 id = self.request("MESH_GROUP_REMOVE " + str(self.ifname))
417 if "FAIL" in id:
418 raise Exception("MESH_GROUP_REMOVE failed")
419 return None
420
421 def connect_network(self, id, timeout=None):
422 if timeout is None:
423 timeout = 10 if self.hostname is None else 60
424 self.dump_monitor()
425 self.select_network(id)
426 self.wait_connected(timeout=timeout)
427 self.dump_monitor()
428
429 def get_status(self, extra=None):
430 if extra:
431 extra = "-" + extra
432 else:
433 extra = ""
434 res = self.request("STATUS" + extra)
435 lines = res.splitlines()
436 vals = dict()
437 for l in lines:
438 try:
439 [name,value] = l.split('=', 1)
440 vals[name] = value
441 except ValueError as e:
442 logger.info(self.ifname + ": Ignore unexpected STATUS line: " + l)
443 return vals
444
445 def get_status_field(self, field, extra=None):
446 vals = self.get_status(extra)
447 if field in vals:
448 return vals[field]
449 return None
450
451 def get_group_status(self, extra=None):
452 if extra:
453 extra = "-" + extra
454 else:
455 extra = ""
456 res = self.group_request("STATUS" + extra)
457 lines = res.splitlines()
458 vals = dict()
459 for l in lines:
460 try:
461 [name,value] = l.split('=', 1)
462 except ValueError:
463 logger.info(self.ifname + ": Ignore unexpected status line: " + l)
464 continue
465 vals[name] = value
466 return vals
467
468 def get_group_status_field(self, field, extra=None):
469 vals = self.get_group_status(extra)
470 if field in vals:
471 return vals[field]
472 return None
473
474 def get_driver_status(self, ifname=None):
475 if ifname is None:
476 res = self.request("STATUS-DRIVER")
477 else:
478 res = self.global_request("IFNAME=%s STATUS-DRIVER" % ifname)
479 if res.startswith("FAIL"):
480 return dict()
481 lines = res.splitlines()
482 vals = dict()
483 for l in lines:
484 try:
485 [name,value] = l.split('=', 1)
486 except ValueError:
487 logger.info(self.ifname + ": Ignore unexpected status-driver line: " + l)
488 continue
489 vals[name] = value
490 return vals
491
492 def get_driver_status_field(self, field, ifname=None):
493 vals = self.get_driver_status(ifname)
494 if field in vals:
495 return vals[field]
496 return None
497
498 def get_mcc(self):
499 mcc = int(self.get_driver_status_field('capa.num_multichan_concurrent'))
500 return 1 if mcc < 2 else mcc
501
502 def get_mib(self):
503 res = self.request("MIB")
504 lines = res.splitlines()
505 vals = dict()
506 for l in lines:
507 try:
508 [name,value] = l.split('=', 1)
509 vals[name] = value
510 except ValueError as e:
511 logger.info(self.ifname + ": Ignore unexpected MIB line: " + l)
512 return vals
513
514 def p2p_dev_addr(self):
515 return self.get_status_field("p2p_device_address")
516
517 def p2p_interface_addr(self):
518 return self.get_group_status_field("address")
519
520 def own_addr(self):
521 try:
522 res = self.p2p_interface_addr()
523 except:
524 res = self.p2p_dev_addr()
525 return res
526
527 def p2p_listen(self):
528 return self.global_request("P2P_LISTEN")
529
530 def p2p_ext_listen(self, period, interval):
531 return self.global_request("P2P_EXT_LISTEN %d %d" % (period, interval))
532
533 def p2p_cancel_ext_listen(self):
534 return self.global_request("P2P_EXT_LISTEN")
535
536 def p2p_find(self, social=False, progressive=False, dev_id=None,
537 dev_type=None, delay=None, freq=None):
538 cmd = "P2P_FIND"
539 if social:
540 cmd = cmd + " type=social"
541 elif progressive:
542 cmd = cmd + " type=progressive"
543 if dev_id:
544 cmd = cmd + " dev_id=" + dev_id
545 if dev_type:
546 cmd = cmd + " dev_type=" + dev_type
547 if delay:
548 cmd = cmd + " delay=" + str(delay)
549 if freq:
550 cmd = cmd + " freq=" + str(freq)
551 return self.global_request(cmd)
552
553 def p2p_stop_find(self):
554 return self.global_request("P2P_STOP_FIND")
555
556 def wps_read_pin(self):
557 self.pin = self.request("WPS_PIN get").rstrip("\n")
558 if "FAIL" in self.pin:
559 raise Exception("Could not generate PIN")
560 return self.pin
561
562 def peer_known(self, peer, full=True):
563 res = self.global_request("P2P_PEER " + peer)
564 if peer.lower() not in res.lower():
565 return False
566 if not full:
567 return True
568 return "[PROBE_REQ_ONLY]" not in res
569
570 def discover_peer(self, peer, full=True, timeout=15, social=True,
571 force_find=False, freq=None):
572 logger.info(self.ifname + ": Trying to discover peer " + peer)
573 if not force_find and self.peer_known(peer, full):
574 return True
575 self.p2p_find(social, freq=freq)
576 count = 0
577 while count < timeout * 4:
578 time.sleep(0.25)
579 count = count + 1
580 if self.peer_known(peer, full):
581 return True
582 return False
583
584 def get_peer(self, peer):
585 res = self.global_request("P2P_PEER " + peer)
586 if peer.lower() not in res.lower():
587 raise Exception("Peer information not available")
588 lines = res.splitlines()
589 vals = dict()
590 for l in lines:
591 if '=' in l:
592 [name,value] = l.split('=', 1)
593 vals[name] = value
594 return vals
595
596 def group_form_result(self, ev, expect_failure=False, go_neg_res=None):
597 if expect_failure:
598 if "P2P-GROUP-STARTED" in ev:
599 raise Exception("Group formation succeeded when expecting failure")
600 exp = r'<.>(P2P-GO-NEG-FAILURE) status=([0-9]*)'
601 s = re.split(exp, ev)
602 if len(s) < 3:
603 return None
604 res = {}
605 res['result'] = 'go-neg-failed'
606 res['status'] = int(s[2])
607 return res
608
609 if "P2P-GROUP-STARTED" not in ev:
610 raise Exception("No P2P-GROUP-STARTED event seen")
611
612 exp = r'<.>(P2P-GROUP-STARTED) ([^ ]*) ([^ ]*) ssid="(.*)" freq=([0-9]*) ((?:psk=.*)|(?:passphrase=".*")) go_dev_addr=([0-9a-f:]*) ip_addr=([0-9.]*) ip_mask=([0-9.]*) go_ip_addr=([0-9.]*)'
613 s = re.split(exp, ev)
614 if len(s) < 11:
615 exp = r'<.>(P2P-GROUP-STARTED) ([^ ]*) ([^ ]*) ssid="(.*)" freq=([0-9]*) ((?:psk=.*)|(?:passphrase=".*")) go_dev_addr=([0-9a-f:]*)'
616 s = re.split(exp, ev)
617 if len(s) < 8:
618 raise Exception("Could not parse P2P-GROUP-STARTED")
619 res = {}
620 res['result'] = 'success'
621 res['ifname'] = s[2]
622 self.group_ifname = s[2]
623 try:
624 if self.hostname is None:
625 self.gctrl_mon = wpaspy.Ctrl(os.path.join(wpas_ctrl,
626 self.group_ifname))
627 else:
628 port = self.get_ctrl_iface_port(self.group_ifname)
629 self.gctrl_mon = wpaspy.Ctrl(self.hostname, port)
630 self.gctrl_mon.attach()
631 except:
632 logger.debug("Could not open monitor socket for group interface")
633 self.gctrl_mon = None
634 res['role'] = s[3]
635 res['ssid'] = s[4]
636 res['freq'] = s[5]
637 if "[PERSISTENT]" in ev:
638 res['persistent'] = True
639 else:
640 res['persistent'] = False
641 p = re.match(r'psk=([0-9a-f]*)', s[6])
642 if p:
643 res['psk'] = p.group(1)
644 p = re.match(r'passphrase="(.*)"', s[6])
645 if p:
646 res['passphrase'] = p.group(1)
647 res['go_dev_addr'] = s[7]
648
649 if len(s) > 8 and len(s[8]) > 0 and "[PERSISTENT]" not in s[8]:
650 res['ip_addr'] = s[8]
651 if len(s) > 9:
652 res['ip_mask'] = s[9]
653 if len(s) > 10:
654 res['go_ip_addr'] = s[10]
655
656 if go_neg_res:
657 exp = r'<.>(P2P-GO-NEG-SUCCESS) role=(GO|client) freq=([0-9]*)'
658 s = re.split(exp, go_neg_res)
659 if len(s) < 4:
660 raise Exception("Could not parse P2P-GO-NEG-SUCCESS")
661 res['go_neg_role'] = s[2]
662 res['go_neg_freq'] = s[3]
663
664 return res
665
666 def p2p_go_neg_auth(self, peer, pin, method, go_intent=None,
667 persistent=False, freq=None, freq2=None,
668 max_oper_chwidth=None, ht40=False, vht=False):
669 if not self.discover_peer(peer):
670 raise Exception("Peer " + peer + " not found")
671 self.dump_monitor()
672 if pin:
673 cmd = "P2P_CONNECT " + peer + " " + pin + " " + method + " auth"
674 else:
675 cmd = "P2P_CONNECT " + peer + " " + method + " auth"
676 if go_intent:
677 cmd = cmd + ' go_intent=' + str(go_intent)
678 if freq:
679 cmd = cmd + ' freq=' + str(freq)
680 if freq2:
681 cmd = cmd + ' freq2=' + str(freq2)
682 if max_oper_chwidth:
683 cmd = cmd + ' max_oper_chwidth=' + str(max_oper_chwidth)
684 if ht40:
685 cmd = cmd + ' ht40'
686 if vht:
687 cmd = cmd + ' vht'
688 if persistent:
689 cmd = cmd + " persistent"
690 if "OK" in self.global_request(cmd):
691 return None
692 raise Exception("P2P_CONNECT (auth) failed")
693
694 def p2p_go_neg_auth_result(self, timeout=None, expect_failure=False):
695 if timeout is None:
696 timeout = 1 if expect_failure else 5
697 go_neg_res = None
698 ev = self.wait_global_event(["P2P-GO-NEG-SUCCESS",
699 "P2P-GO-NEG-FAILURE"], timeout)
700 if ev is None:
701 if expect_failure:
702 return None
703 raise Exception("Group formation timed out")
704 if "P2P-GO-NEG-SUCCESS" in ev:
705 go_neg_res = ev
706 ev = self.wait_global_event(["P2P-GROUP-STARTED"], timeout)
707 if ev is None:
708 if expect_failure:
709 return None
710 raise Exception("Group formation timed out")
711 self.dump_monitor()
712 return self.group_form_result(ev, expect_failure, go_neg_res)
713
714 def p2p_go_neg_init(self, peer, pin, method, timeout=0, go_intent=None,
715 expect_failure=False, persistent=False,
716 persistent_id=None, freq=None, provdisc=False,
717 wait_group=True, freq2=None, max_oper_chwidth=None,
718 ht40=False, vht=False):
719 if not self.discover_peer(peer):
720 raise Exception("Peer " + peer + " not found")
721 self.dump_monitor()
722 if pin:
723 cmd = "P2P_CONNECT " + peer + " " + pin + " " + method
724 else:
725 cmd = "P2P_CONNECT " + peer + " " + method
726 if go_intent is not None:
727 cmd = cmd + ' go_intent=' + str(go_intent)
728 if freq:
729 cmd = cmd + ' freq=' + str(freq)
730 if freq2:
731 cmd = cmd + ' freq2=' + str(freq2)
732 if max_oper_chwidth:
733 cmd = cmd + ' max_oper_chwidth=' + str(max_oper_chwidth)
734 if ht40:
735 cmd = cmd + ' ht40'
736 if vht:
737 cmd = cmd + ' vht'
738 if persistent:
739 cmd = cmd + " persistent"
740 elif persistent_id:
741 cmd = cmd + " persistent=" + persistent_id
742 if provdisc:
743 cmd = cmd + " provdisc"
744 if "OK" in self.global_request(cmd):
745 if timeout == 0:
746 return None
747 go_neg_res = None
748 ev = self.wait_global_event(["P2P-GO-NEG-SUCCESS",
749 "P2P-GO-NEG-FAILURE"], timeout)
750 if ev is None:
751 if expect_failure:
752 return None
753 raise Exception("Group formation timed out")
754 if "P2P-GO-NEG-SUCCESS" in ev:
755 if not wait_group:
756 return ev
757 go_neg_res = ev
758 ev = self.wait_global_event(["P2P-GROUP-STARTED"], timeout)
759 if ev is None:
760 if expect_failure:
761 return None
762 raise Exception("Group formation timed out")
763 self.dump_monitor()
764 return self.group_form_result(ev, expect_failure, go_neg_res)
765 raise Exception("P2P_CONNECT failed")
766
767 def _wait_event(self, mon, pfx, events, timeout):
768 start = os.times()[4]
769 while True:
770 while mon.pending():
771 ev = mon.recv()
772 logger.debug(self.dbg + pfx + ev)
773 for event in events:
774 if event in ev:
775 return ev
776 now = os.times()[4]
777 remaining = start + timeout - now
778 if remaining <= 0:
779 break
780 if not mon.pending(timeout=remaining):
781 break
782 return None
783
784 def wait_event(self, events, timeout=10):
785 return self._wait_event(self.mon, ": ", events, timeout)
786
787 def wait_global_event(self, events, timeout):
788 if self.global_iface is None:
789 return self.wait_event(events, timeout)
790 return self._wait_event(self.global_mon, "(global): ",
791 events, timeout)
792
793 def wait_group_event(self, events, timeout=10):
794 if self.group_ifname and self.group_ifname != self.ifname:
795 if self.gctrl_mon is None:
796 return None
797 start = os.times()[4]
798 while True:
799 while self.gctrl_mon.pending():
800 ev = self.gctrl_mon.recv()
801 logger.debug(self.group_dbg + "(group): " + ev)
802 for event in events:
803 if event in ev:
804 return ev
805 now = os.times()[4]
806 remaining = start + timeout - now
807 if remaining <= 0:
808 break
809 if not self.gctrl_mon.pending(timeout=remaining):
810 break
811 return None
812
813 return self.wait_event(events, timeout)
814
815 def wait_go_ending_session(self):
816 if self.gctrl_mon:
817 try:
818 self.gctrl_mon.detach()
819 except:
820 pass
821 self.gctrl_mon = None
822 timeout = 3 if self.hostname is None else 10
823 ev = self.wait_global_event(["P2P-GROUP-REMOVED"], timeout=timeout)
824 if ev is None:
825 raise Exception("Group removal event timed out")
826 if "reason=GO_ENDING_SESSION" not in ev:
827 raise Exception("Unexpected group removal reason")
828
829 def dump_monitor(self):
830 count_iface = 0
831 count_global = 0
832 while self.mon.pending():
833 ev = self.mon.recv()
834 logger.debug(self.dbg + ": " + ev)
835 count_iface += 1
836 while self.global_mon and self.global_mon.pending():
837 ev = self.global_mon.recv()
838 logger.debug(self.global_dbg + self.ifname + "(global): " + ev)
839 count_global += 1
840 return (count_iface, count_global)
841
842 def remove_group(self, ifname=None):
843 if self.gctrl_mon:
844 try:
845 self.gctrl_mon.detach()
846 except:
847 pass
848 self.gctrl_mon = None
849 if ifname is None:
850 ifname = self.group_ifname if self.group_ifname else self.ifname
851 if "OK" not in self.global_request("P2P_GROUP_REMOVE " + ifname):
852 raise Exception("Group could not be removed")
853 self.group_ifname = None
854
855 def p2p_start_go(self, persistent=None, freq=None, no_event_clear=False):
856 self.dump_monitor()
857 cmd = "P2P_GROUP_ADD"
858 if persistent is None:
859 pass
860 elif persistent is True:
861 cmd = cmd + " persistent"
862 else:
863 cmd = cmd + " persistent=" + str(persistent)
864 if freq:
865 cmd = cmd + " freq=" + str(freq)
866 if "OK" in self.global_request(cmd):
867 ev = self.wait_global_event(["P2P-GROUP-STARTED"], timeout=5)
868 if ev is None:
869 raise Exception("GO start up timed out")
870 if not no_event_clear:
871 self.dump_monitor()
872 return self.group_form_result(ev)
873 raise Exception("P2P_GROUP_ADD failed")
874
875 def p2p_go_authorize_client(self, pin):
876 cmd = "WPS_PIN any " + pin
877 if "FAIL" in self.group_request(cmd):
878 raise Exception("Failed to authorize client connection on GO")
879 return None
880
881 def p2p_go_authorize_client_pbc(self):
882 cmd = "WPS_PBC"
883 if "FAIL" in self.group_request(cmd):
884 raise Exception("Failed to authorize client connection on GO")
885 return None
886
887 def p2p_connect_group(self, go_addr, pin, timeout=0, social=False,
888 freq=None):
889 self.dump_monitor()
890 if not self.discover_peer(go_addr, social=social, freq=freq):
891 if social or not self.discover_peer(go_addr, social=social):
892 raise Exception("GO " + go_addr + " not found")
893 self.p2p_stop_find()
894 self.dump_monitor()
895 cmd = "P2P_CONNECT " + go_addr + " " + pin + " join"
896 if freq:
897 cmd += " freq=" + str(freq)
898 if "OK" in self.global_request(cmd):
899 if timeout == 0:
900 self.dump_monitor()
901 return None
902 ev = self.wait_global_event(["P2P-GROUP-STARTED",
903 "P2P-GROUP-FORMATION-FAILURE"],
904 timeout)
905 if ev is None:
906 raise Exception("Joining the group timed out")
907 if "P2P-GROUP-STARTED" not in ev:
908 raise Exception("Failed to join the group")
909 self.dump_monitor()
910 return self.group_form_result(ev)
911 raise Exception("P2P_CONNECT(join) failed")
912
913 def tdls_setup(self, peer):
914 cmd = "TDLS_SETUP " + peer
915 if "FAIL" in self.group_request(cmd):
916 raise Exception("Failed to request TDLS setup")
917 return None
918
919 def tdls_teardown(self, peer):
920 cmd = "TDLS_TEARDOWN " + peer
921 if "FAIL" in self.group_request(cmd):
922 raise Exception("Failed to request TDLS teardown")
923 return None
924
925 def tdls_link_status(self, peer):
926 cmd = "TDLS_LINK_STATUS " + peer
927 ret = self.group_request(cmd)
928 if "FAIL" in ret:
929 raise Exception("Failed to request TDLS link status")
930 return ret
931
932 def tspecs(self):
933 """Return (tsid, up) tuples representing current tspecs"""
934 res = self.request("WMM_AC_STATUS")
935 tspecs = re.findall(r"TSID=(\d+) UP=(\d+)", res)
936 tspecs = [tuple(map(int, tspec)) for tspec in tspecs]
937
938 logger.debug("tspecs: " + str(tspecs))
939 return tspecs
940
941 def add_ts(self, tsid, up, direction="downlink", expect_failure=False,
942 extra=None):
943 params = {
944 "sba": 9000,
945 "nominal_msdu_size": 1500,
946 "min_phy_rate": 6000000,
947 "mean_data_rate": 1500,
948 }
949 cmd = "WMM_AC_ADDTS %s tsid=%d up=%d" % (direction, tsid, up)
950 for (key, value) in params.items():
951 cmd += " %s=%d" % (key, value)
952 if extra:
953 cmd += " " + extra
954
955 if self.request(cmd).strip() != "OK":
956 raise Exception("ADDTS failed (tsid=%d up=%d)" % (tsid, up))
957
958 if expect_failure:
959 ev = self.wait_event(["TSPEC-REQ-FAILED"], timeout=2)
960 if ev is None:
961 raise Exception("ADDTS failed (time out while waiting failure)")
962 if "tsid=%d" % (tsid) not in ev:
963 raise Exception("ADDTS failed (invalid tsid in TSPEC-REQ-FAILED")
964 return
965
966 ev = self.wait_event(["TSPEC-ADDED"], timeout=1)
967 if ev is None:
968 raise Exception("ADDTS failed (time out)")
969 if "tsid=%d" % (tsid) not in ev:
970 raise Exception("ADDTS failed (invalid tsid in TSPEC-ADDED)")
971
972 if not (tsid, up) in self.tspecs():
973 raise Exception("ADDTS failed (tsid not in tspec list)")
974
975 def del_ts(self, tsid):
976 if self.request("WMM_AC_DELTS %d" % (tsid)).strip() != "OK":
977 raise Exception("DELTS failed")
978
979 ev = self.wait_event(["TSPEC-REMOVED"], timeout=1)
980 if ev is None:
981 raise Exception("DELTS failed (time out)")
982 if "tsid=%d" % (tsid) not in ev:
983 raise Exception("DELTS failed (invalid tsid in TSPEC-REMOVED)")
984
985 tspecs = [(t, u) for (t, u) in self.tspecs() if t == tsid]
986 if tspecs:
987 raise Exception("DELTS failed (still in tspec list)")
988
989 def connect(self, ssid=None, ssid2=None, **kwargs):
990 logger.info("Connect STA " + self.ifname + " to AP")
991 id = self.add_network()
992 if ssid:
993 self.set_network_quoted(id, "ssid", ssid)
994 elif ssid2:
995 self.set_network(id, "ssid", ssid2)
996
997 quoted = [ "psk", "identity", "anonymous_identity", "password",
998 "ca_cert", "client_cert", "private_key",
999 "private_key_passwd", "ca_cert2", "client_cert2",
1000 "private_key2", "phase1", "phase2", "domain_suffix_match",
1001 "altsubject_match", "subject_match", "pac_file", "dh_file",
1002 "bgscan", "ht_mcs", "id_str", "openssl_ciphers",
1003 "domain_match", "dpp_connector", "sae_password",
1004 "sae_password_id" ]
1005 for field in quoted:
1006 if field in kwargs and kwargs[field]:
1007 self.set_network_quoted(id, field, kwargs[field])
1008
1009 not_quoted = [ "proto", "key_mgmt", "ieee80211w", "pairwise",
1010 "group", "wep_key0", "wep_key1", "wep_key2", "wep_key3",
1011 "wep_tx_keyidx", "scan_freq", "freq_list", "eap",
1012 "eapol_flags", "fragment_size", "scan_ssid", "auth_alg",
1013 "wpa_ptk_rekey", "disable_ht", "disable_vht", "bssid",
1014 "disable_max_amsdu", "ampdu_factor", "ampdu_density",
1015 "disable_ht40", "disable_sgi", "disable_ldpc",
1016 "ht40_intolerant", "update_identifier", "mac_addr",
1017 "erp", "bg_scan_period", "bssid_blacklist",
1018 "bssid_whitelist", "mem_only_psk", "eap_workaround",
1019 "engine", "fils_dh_group", "bssid_hint",
1020 "dpp_csign", "dpp_csign_expiry",
1021 "dpp_netaccesskey", "dpp_netaccesskey_expiry",
1022 "group_mgmt", "owe_group",
1023 "roaming_consortium_selection", "ocv",
1024 "multi_ap_backhaul_sta", "rx_stbc", "tx_stbc" ]
1025 for field in not_quoted:
1026 if field in kwargs and kwargs[field]:
1027 self.set_network(id, field, kwargs[field])
1028
1029 if "raw_psk" in kwargs and kwargs['raw_psk']:
1030 self.set_network(id, "psk", kwargs['raw_psk'])
1031 if "password_hex" in kwargs and kwargs['password_hex']:
1032 self.set_network(id, "password", kwargs['password_hex'])
1033 if "peerkey" in kwargs and kwargs['peerkey']:
1034 self.set_network(id, "peerkey", "1")
1035 if "okc" in kwargs and kwargs['okc']:
1036 self.set_network(id, "proactive_key_caching", "1")
1037 if "ocsp" in kwargs and kwargs['ocsp']:
1038 self.set_network(id, "ocsp", str(kwargs['ocsp']))
1039 if "only_add_network" in kwargs and kwargs['only_add_network']:
1040 return id
1041 if "wait_connect" not in kwargs or kwargs['wait_connect']:
1042 if "eap" in kwargs:
1043 self.connect_network(id, timeout=20)
1044 else:
1045 self.connect_network(id)
1046 else:
1047 self.dump_monitor()
1048 self.select_network(id)
1049 return id
1050
1051 def scan(self, type=None, freq=None, no_wait=False, only_new=False,
1052 passive=False):
1053 if type:
1054 cmd = "SCAN TYPE=" + type
1055 else:
1056 cmd = "SCAN"
1057 if freq:
1058 cmd = cmd + " freq=" + str(freq)
1059 if only_new:
1060 cmd += " only_new=1"
1061 if passive:
1062 cmd += " passive=1"
1063 if not no_wait:
1064 self.dump_monitor()
1065 if not "OK" in self.request(cmd):
1066 raise Exception("Failed to trigger scan")
1067 if no_wait:
1068 return
1069 ev = self.wait_event(["CTRL-EVENT-SCAN-RESULTS",
1070 "CTRL-EVENT-SCAN-FAILED"], 15)
1071 if ev is None:
1072 raise Exception("Scan timed out")
1073 if "CTRL-EVENT-SCAN-FAILED" in ev:
1074 raise Exception("Scan failed: " + ev)
1075
1076 def scan_for_bss(self, bssid, freq=None, force_scan=False, only_new=False,
1077 passive=False):
1078 if not force_scan and self.get_bss(bssid) is not None:
1079 return
1080 for i in range(0, 10):
1081 self.scan(freq=freq, type="ONLY", only_new=only_new,
1082 passive=passive)
1083 if self.get_bss(bssid) is not None:
1084 return
1085 raise Exception("Could not find BSS " + bssid + " in scan")
1086
1087 def flush_scan_cache(self, freq=2417):
1088 self.request("BSS_FLUSH 0")
1089 self.scan(freq=freq, only_new=True)
1090 res = self.request("SCAN_RESULTS")
1091 if len(res.splitlines()) > 1:
1092 self.request("BSS_FLUSH 0")
1093 self.scan(freq=2422, only_new=True)
1094 res = self.request("SCAN_RESULTS")
1095 if len(res.splitlines()) > 1:
1096 logger.info("flush_scan_cache: Could not clear all BSS entries. These remain:\n" + res)
1097
1098 def roam(self, bssid, fail_test=False, assoc_reject_ok=False):
1099 self.dump_monitor()
1100 if "OK" not in self.request("ROAM " + bssid):
1101 raise Exception("ROAM failed")
1102 if fail_test:
1103 if assoc_reject_ok:
1104 ev = self.wait_event(["CTRL-EVENT-CONNECTED",
1105 "CTRL-EVENT-ASSOC-REJECT"], timeout=1)
1106 else:
1107 ev = self.wait_event(["CTRL-EVENT-CONNECTED"], timeout=1)
1108 if ev is not None and "CTRL-EVENT-ASSOC-REJECT" not in ev:
1109 raise Exception("Unexpected connection")
1110 self.dump_monitor()
1111 return
1112 ev = self.wait_event(["CTRL-EVENT-CONNECTED",
1113 "CTRL-EVENT-ASSOC-REJECT"], timeout=10)
1114 if ev is None:
1115 raise Exception("Roaming with the AP timed out")
1116 if "CTRL-EVENT-ASSOC-REJECT" in ev:
1117 raise Exception("Roaming association rejected")
1118 self.dump_monitor()
1119
1120 def roam_over_ds(self, bssid, fail_test=False):
1121 self.dump_monitor()
1122 if "OK" not in self.request("FT_DS " + bssid):
1123 raise Exception("FT_DS failed")
1124 if fail_test:
1125 ev = self.wait_event(["CTRL-EVENT-CONNECTED"], timeout=1)
1126 if ev is not None:
1127 raise Exception("Unexpected connection")
1128 self.dump_monitor()
1129 return
1130 ev = self.wait_event(["CTRL-EVENT-CONNECTED",
1131 "CTRL-EVENT-ASSOC-REJECT"], timeout=10)
1132 if ev is None:
1133 raise Exception("Roaming with the AP timed out")
1134 if "CTRL-EVENT-ASSOC-REJECT" in ev:
1135 raise Exception("Roaming association rejected")
1136 self.dump_monitor()
1137
1138 def wps_reg(self, bssid, pin, new_ssid=None, key_mgmt=None, cipher=None,
1139 new_passphrase=None, no_wait=False):
1140 self.dump_monitor()
1141 if new_ssid:
1142 self.request("WPS_REG " + bssid + " " + pin + " " +
1143 binascii.hexlify(new_ssid.encode()).decode() + " " +
1144 key_mgmt + " " + cipher + " " +
1145 binascii.hexlify(new_passphrase.encode()).decode())
1146 if no_wait:
1147 return
1148 ev = self.wait_event(["WPS-SUCCESS"], timeout=15)
1149 else:
1150 self.request("WPS_REG " + bssid + " " + pin)
1151 if no_wait:
1152 return
1153 ev = self.wait_event(["WPS-CRED-RECEIVED"], timeout=15)
1154 if ev is None:
1155 raise Exception("WPS cred timed out")
1156 ev = self.wait_event(["WPS-FAIL"], timeout=15)
1157 if ev is None:
1158 raise Exception("WPS timed out")
1159 self.wait_connected(timeout=15)
1160
1161 def relog(self):
1162 self.global_request("RELOG")
1163
1164 def wait_completed(self, timeout=10):
1165 for i in range(0, timeout * 2):
1166 if self.get_status_field("wpa_state") == "COMPLETED":
1167 return
1168 time.sleep(0.5)
1169 raise Exception("Timeout while waiting for COMPLETED state")
1170
1171 def get_capability(self, field):
1172 res = self.request("GET_CAPABILITY " + field)
1173 if "FAIL" in res:
1174 return None
1175 return res.split(' ')
1176
1177 def get_bss(self, bssid, ifname=None):
1178 if not ifname or ifname == self.ifname:
1179 res = self.request("BSS " + bssid)
1180 elif ifname == self.group_ifname:
1181 res = self.group_request("BSS " + bssid)
1182 else:
1183 return None
1184
1185 if "FAIL" in res:
1186 return None
1187 lines = res.splitlines()
1188 vals = dict()
1189 for l in lines:
1190 [name,value] = l.split('=', 1)
1191 vals[name] = value
1192 if len(vals) == 0:
1193 return None
1194 return vals
1195
1196 def get_pmksa(self, bssid):
1197 res = self.request("PMKSA")
1198 lines = res.splitlines()
1199 for l in lines:
1200 if bssid not in l:
1201 continue
1202 vals = dict()
1203 try:
1204 [index,aa,pmkid,expiration,opportunistic] = l.split(' ')
1205 cache_id = None
1206 except ValueError:
1207 [index,aa,pmkid,expiration,opportunistic,cache_id] = l.split(' ')
1208 vals['index'] = index
1209 vals['pmkid'] = pmkid
1210 vals['expiration'] = expiration
1211 vals['opportunistic'] = opportunistic
1212 if cache_id != None:
1213 vals['cache_id'] = cache_id
1214 return vals
1215 return None
1216
1217 def get_sta(self, addr, info=None, next=False):
1218 cmd = "STA-NEXT " if next else "STA "
1219 if addr is None:
1220 res = self.request("STA-FIRST")
1221 elif info:
1222 res = self.request(cmd + addr + " " + info)
1223 else:
1224 res = self.request(cmd + addr)
1225 lines = res.splitlines()
1226 vals = dict()
1227 first = True
1228 for l in lines:
1229 if first:
1230 vals['addr'] = l
1231 first = False
1232 else:
1233 [name,value] = l.split('=', 1)
1234 vals[name] = value
1235 return vals
1236
1237 def mgmt_rx(self, timeout=5):
1238 ev = self.wait_event(["MGMT-RX"], timeout=timeout)
1239 if ev is None:
1240 return None
1241 msg = {}
1242 items = ev.split(' ')
1243 field,val = items[1].split('=')
1244 if field != "freq":
1245 raise Exception("Unexpected MGMT-RX event format: " + ev)
1246 msg['freq'] = val
1247
1248 field,val = items[2].split('=')
1249 if field != "datarate":
1250 raise Exception("Unexpected MGMT-RX event format: " + ev)
1251 msg['datarate'] = val
1252
1253 field,val = items[3].split('=')
1254 if field != "ssi_signal":
1255 raise Exception("Unexpected MGMT-RX event format: " + ev)
1256 msg['ssi_signal'] = val
1257
1258 frame = binascii.unhexlify(items[4])
1259 msg['frame'] = frame
1260
1261 hdr = struct.unpack('<HH6B6B6BH', frame[0:24])
1262 msg['fc'] = hdr[0]
1263 msg['subtype'] = (hdr[0] >> 4) & 0xf
1264 hdr = hdr[1:]
1265 msg['duration'] = hdr[0]
1266 hdr = hdr[1:]
1267 msg['da'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
1268 hdr = hdr[6:]
1269 msg['sa'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
1270 hdr = hdr[6:]
1271 msg['bssid'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
1272 hdr = hdr[6:]
1273 msg['seq_ctrl'] = hdr[0]
1274 msg['payload'] = frame[24:]
1275
1276 return msg
1277
1278 def wait_connected(self, timeout=10, error="Connection timed out"):
1279 ev = self.wait_event(["CTRL-EVENT-CONNECTED"], timeout=timeout)
1280 if ev is None:
1281 raise Exception(error)
1282 return ev
1283
1284 def wait_disconnected(self, timeout=None, error="Disconnection timed out"):
1285 if timeout is None:
1286 timeout = 10 if self.hostname is None else 30
1287 ev = self.wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=timeout)
1288 if ev is None:
1289 raise Exception(error)
1290 return ev
1291
1292 def get_group_ifname(self):
1293 return self.group_ifname if self.group_ifname else self.ifname
1294
1295 def get_config(self):
1296 res = self.request("DUMP")
1297 if res.startswith("FAIL"):
1298 raise Exception("DUMP failed")
1299 lines = res.splitlines()
1300 vals = dict()
1301 for l in lines:
1302 [name,value] = l.split('=', 1)
1303 vals[name] = value
1304 return vals
1305
1306 def asp_provision(self, peer, adv_id, adv_mac, session_id, session_mac,
1307 method="1000", info="", status=None, cpt=None, role=None):
1308 if status is None:
1309 cmd = "P2P_ASP_PROVISION"
1310 params = "info='%s' method=%s" % (info, method)
1311 else:
1312 cmd = "P2P_ASP_PROVISION_RESP"
1313 params = "status=%d" % status
1314
1315 if role is not None:
1316 params += " role=" + role
1317 if cpt is not None:
1318 params += " cpt=" + cpt
1319
1320 if "OK" not in self.global_request("%s %s adv_id=%s adv_mac=%s session=%d session_mac=%s %s" %
1321 (cmd, peer, adv_id, adv_mac, session_id, session_mac, params)):
1322 raise Exception("%s request failed" % cmd)
1323
1324 def note(self, txt):
1325 self.request("NOTE " + txt)
1326
1327 def wait_regdom(self, country_ie=False):
1328 for i in range(5):
1329 ev = self.wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
1330 if ev is None:
1331 break
1332 if country_ie:
1333 if "init=COUNTRY_IE" in ev:
1334 break
1335 else:
1336 break
1337
1338 def dpp_qr_code(self, uri):
1339 res = self.request("DPP_QR_CODE " + uri)
1340 if "FAIL" in res:
1341 raise Exception("Failed to parse QR Code URI")
1342 return int(res)