import hwsim_utils
from wpasupplicant import WpaSupplicant
+from p2p_utils import *
def test_discovery(dev):
"""P2P device discovery and provision discovery"""
if ev is None:
raise Exception("Timeout on waiting for GO Negotiation Request")
+def test_discovery_ctrl_char_in_devname(dev):
+ """P2P device discovery and control character in Device Name"""
+ try:
+ _test_discovery_ctrl_char_in_devname(dev)
+ finally:
+ dev[1].global_request("SET device_name Device B")
+
+def _test_discovery_ctrl_char_in_devname(dev):
+ dev[1].global_request("SET device_name Device\nB")
+ addr0 = dev[0].p2p_dev_addr()
+ addr1 = dev[1].p2p_dev_addr()
+ res = dev[0].p2p_start_go(freq=2422)
+ bssid = dev[0].p2p_interface_addr()
+ pin = dev[1].wps_read_pin()
+ dev[0].p2p_go_authorize_client(pin)
+ dev[1].scan_for_bss(bssid, freq=2422)
+ dev[1].p2p_connect_group(addr0, pin, timeout=60, freq=2422)
+ if not dev[2].discover_peer(addr1, social=False, freq=2422, timeout=5):
+ if not dev[2].discover_peer(addr1, social=False, freq=2422, timeout=5):
+ if not dev[2].discover_peer(addr1, social=False, freq=2422,
+ timeout=5):
+ raise Exception("Could not discover group client")
+ devname = dev[2].get_peer(addr1)['device_name']
+ dev[2].p2p_stop_find()
+ if devname != "Device_B":
+ raise Exception("Unexpected device_name from group client: " + devname)
+
+ terminate_group(dev[0], dev[1])
+ dev[2].request("P2P_FLUSH")
+
+ dev[1].p2p_listen()
+ if not dev[2].discover_peer(addr1, social=True, timeout=10):
+ raise Exception("Could not discover peer")
+ devname = dev[2].get_peer(addr1)['device_name']
+ dev[2].p2p_stop_find()
+ if devname != "Device_B":
+ raise Exception("Unexpected device_name from peer: " + devname)
+
def test_discovery_dev_type(dev):
"""P2P device discovery with Device Type filter"""
dev[1].request("SET sec_device_type 1-0050F204-2")
return True
return "[PROBE_REQ_ONLY]" not in res
- def discover_peer(self, peer, full=True, timeout=15, social=True, force_find=False):
+ def discover_peer(self, peer, full=True, timeout=15, social=True,
+ force_find=False, freq=None):
logger.info(self.ifname + ": Trying to discover peer " + peer)
if not force_find and self.peer_known(peer, full):
return True
- self.p2p_find(social)
+ self.p2p_find(social, freq=freq)
count = 0
while count < timeout * 4:
time.sleep(0.25)
def p2p_connect_group(self, go_addr, pin, timeout=0, social=False,
freq=None):
self.dump_monitor()
- if not self.discover_peer(go_addr, social=social):
+ if not self.discover_peer(go_addr, social=social, freq=freq):
if social or not self.discover_peer(go_addr, social=social):
raise Exception("GO " + go_addr + " not found")
self.dump_monitor()