from wpasupplicant import WpaSupplicant
from p2p_utils import *
from test_gas import start_ap
+from test_cfg80211 import nl80211_remain_on_channel
def test_discovery(dev):
"""P2P device discovery and provision discovery"""
logger.info("Device discovery after fragmented GAS took %f seconds" % (end - start))
if end - start > 1.3:
raise Exception("Device discovery took unexpectedly long time")
+
+def test_discovery_listen_find(dev):
+ """P2P_LISTEN immediately followed by P2P_FIND"""
+ # Request an external remain-on-channel operation to delay start of the ROC
+ # for the following p2p_listen() enough to get p2p_find() processed before
+ # the ROC started event shows up. This is done to test a code path where the
+ # p2p_find() needs to clear the wait for the pending listen operation
+ # (p2p->pending_listen_freq).
+ ifindex = int(dev[0].get_driver_status_field("ifindex"))
+ nl80211_remain_on_channel(dev[0], ifindex, 2417, 200)
+
+ addr0 = dev[0].p2p_dev_addr()
+ dev[0].p2p_listen()
+ dev[0].p2p_find(social=True)
+ time.sleep(0.4)
+ dev[1].p2p_listen()
+ ev = dev[0].wait_global_event(["P2P-DEVICE-FOUND"], timeout=1.2)
+ if not dev[1].discover_peer(addr0):
+ raise Exception("Device discovery timed out")
+ if ev is None:
+ raise Exception("Did not find peer quickly enough after stopped P2P_LISTEN")