raise
def test_connectivity_p2p(dev1, dev2):
- test_connectivity(dev1.ifname, dev2.ifname)
+ ifname1 = dev1.group_ifname if dev1.group_ifname else dev1.ifname
+ ifname2 = dev2.group_ifname if dev2.group_ifname else dev2.ifname
+ test_connectivity(ifname1, ifname2)
class WpaSupplicant:
def __init__(self, ifname):
self.ifname = ifname
+ self.group_ifname = None
self.ctrl = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname))
self.mon = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname))
self.mon.attach()
logger.debug(self.ifname + ": CTRL: " + cmd)
return self.ctrl.request(cmd)
+ def group_request(self, cmd):
+ if self.group_ifname and self.group_ifname != self.ifname:
+ logger.debug(self.group_ifname + ": CTRL: " + cmd)
+ gctrl = wpaspy.Ctrl(os.path.join(wpas_ctrl, self.group_ifname))
+ return gctrl.request(cmd)
+ return self.request(cmd)
+
def ping(self):
return "PONG" in self.request("PING")
self.request("P2P_GROUP_REMOVE *")
self.request("REMOVE_NETWORK *")
self.request("REMOVE_CRED *")
+ self.group_ifname = None
def get_status(self, field):
res = self.request("STATUS")
res = {}
res['result'] = 'success'
res['ifname'] = s[2]
+ self.group_ifname = s[2]
res['role'] = s[3]
res['ssid'] = s[4]
res['freq'] = s[5]
def remove_group(self, ifname=None):
if ifname is None:
- ifname = self.ifname
+ ifname = self.group_ifname if self.group_ifname else self.iname
if "OK" not in self.request("P2P_GROUP_REMOVE " + ifname):
raise Exception("Group could not be removed")
+ self.group_ifname = None
def p2p_start_go(self):
self.dump_monitor()
def p2p_go_authorize_client(self, pin):
cmd = "WPS_PIN any " + pin
- if "FAIL" in self.request(cmd):
+ if "FAIL" in self.group_request(cmd):
raise Exception("Failed to authorize client connection on GO")
return None