logger = logging.getLogger()
import hostapd
+from test_p2p_persistent import form
+from test_p2p_persistent import invite
MGMT_SUBTYPE_PROBE_REQ = 4
MGMT_SUBTYPE_ACTION = 13
if ev is not None:
raise Exception("Unexpected invitation event")
+def test_p2p_msg_invitation_resp(dev, apdev):
+ """P2P protocol tests for invitation response processing"""
+ form(dev[0], dev[1])
+ dev[0].dump_monitor()
+ dev[1].dump_monitor()
+
+ dst, src, hapd, channel = start_p2p(dev, apdev)
+
+ addr0 = dev[0].p2p_dev_addr()
+ addr1 = dev[1].p2p_dev_addr()
+ peer = dev[1].get_peer(addr0)
+
+ # P2P Invitation Response from unknown peer
+ msg = p2p_hdr(dst, src, type=P2P_INVITATION_RESP, dialog_token=1)
+ hapd.mgmt_tx(msg)
+
+ # P2P Invitation Response from peer that is not in invitation
+ msg = p2p_hdr(dst, src, type=P2P_INVITATION_RESP, dialog_token=2)
+ attrs = p2p_attr_status()
+ msg['payload'] += ie_p2p(attrs)
+ mgmt_tx(dev[1], "MGMT_TX {} {} freq={} wait_time=50 no_cck=1 action={}".format(addr0, addr0, peer['listen_freq'], binascii.hexlify(msg['payload'])))
+ time.sleep(0.1)
+
+ if "FAIL" in dev[1].request("SET ext_mgmt_frame_handling 1"):
+ raise Exception("Failed to enable external management frame handling")
+
+ invite(dev[0], dev[1])
+ rx_msg = dev[1].mgmt_rx()
+ if rx_msg is None:
+ raise Exception("MGMT-RX timeout")
+ p2p = parse_p2p_public_action(rx_msg['payload'])
+ if p2p is None:
+ raise Exception("Not a P2P Public Action frame " + str(dialog_token))
+ if p2p['subtype'] != P2P_INVITATION_REQ:
+ raise Exception("Unexpected subtype %d" % p2p['subtype'])
+
+ # Invalid attribute to cause p2p_parse() failure
+ msg = p2p_hdr(dst, src, type=P2P_INVITATION_RESP, dialog_token=p2p['dialog_token'])
+ attrs = struct.pack("<BB", P2P_ATTR_CAPABILITY, 0)
+ msg['payload'] += ie_p2p(attrs)
+ mgmt_tx(dev[1], "MGMT_TX {} {} freq={} wait_time=50 no_cck=1 action={}".format(addr0, addr0, rx_msg['freq'], binascii.hexlify(msg['payload'])))
+
+ invite(dev[0], dev[1])
+ rx_msg = dev[1].mgmt_rx()
+ if rx_msg is None:
+ raise Exception("MGMT-RX timeout")
+ p2p = parse_p2p_public_action(rx_msg['payload'])
+ if p2p is None:
+ raise Exception("Not a P2P Public Action frame " + str(dialog_token))
+ if p2p['subtype'] != P2P_INVITATION_REQ:
+ raise Exception("Unexpected subtype %d" % p2p['subtype'])
+
+ # missing mandatory Status attribute
+ msg = p2p_hdr(dst, src, type=P2P_INVITATION_RESP, dialog_token=p2p['dialog_token'])
+ attrs = p2p_attr_channel_list()
+ msg['payload'] += ie_p2p(attrs)
+ mgmt_tx(dev[1], "MGMT_TX {} {} freq={} wait_time=50 no_cck=1 action={}".format(addr0, addr0, rx_msg['freq'], binascii.hexlify(msg['payload'])))
+
+ invite(dev[0], dev[1])
+ rx_msg = dev[1].mgmt_rx()
+ if rx_msg is None:
+ raise Exception("MGMT-RX timeout")
+ p2p = parse_p2p_public_action(rx_msg['payload'])
+ if p2p is None:
+ raise Exception("Not a P2P Public Action frame " + str(dialog_token))
+ if p2p['subtype'] != P2P_INVITATION_REQ:
+ raise Exception("Unexpected subtype %d" % p2p['subtype'])
+
+ # no channel match (no common channel found at all)
+ msg = p2p_hdr(dst, src, type=P2P_INVITATION_RESP, dialog_token=p2p['dialog_token'])
+ attrs = p2p_attr_status()
+ attrs += struct.pack("<BH3BBBB", P2P_ATTR_CHANNEL_LIST, 6,
+ 0x58, 0x58, 0x04,
+ 81, 1, 15)
+ msg['payload'] += ie_p2p(attrs)
+ mgmt_tx(dev[1], "MGMT_TX {} {} freq={} wait_time=50 no_cck=1 action={}".format(addr0, addr0, rx_msg['freq'], binascii.hexlify(msg['payload'])))
+
+ invite(dev[0], dev[1])
+ rx_msg = dev[1].mgmt_rx()
+ if rx_msg is None:
+ raise Exception("MGMT-RX timeout")
+ p2p = parse_p2p_public_action(rx_msg['payload'])
+ if p2p is None:
+ raise Exception("Not a P2P Public Action frame " + str(dialog_token))
+ if p2p['subtype'] != P2P_INVITATION_REQ:
+ raise Exception("Unexpected subtype %d" % p2p['subtype'])
+
+ # no channel match (no acceptable P2P channel)
+ msg = p2p_hdr(dst, src, type=P2P_INVITATION_RESP, dialog_token=p2p['dialog_token'])
+ attrs = p2p_attr_status()
+ attrs += struct.pack("<BH3BBBB", P2P_ATTR_CHANNEL_LIST, 6,
+ 0x58, 0x58, 0x04,
+ 81, 1, 12)
+ msg['payload'] += ie_p2p(attrs)
+ mgmt_tx(dev[1], "MGMT_TX {} {} freq={} wait_time=50 no_cck=1 action={}".format(addr0, addr0, rx_msg['freq'], binascii.hexlify(msg['payload'])))
+
+ invite(dev[0], dev[1])
+ rx_msg = dev[1].mgmt_rx()
+ if rx_msg is None:
+ raise Exception("MGMT-RX timeout")
+ p2p = parse_p2p_public_action(rx_msg['payload'])
+ if p2p is None:
+ raise Exception("Not a P2P Public Action frame " + str(dialog_token))
+ if p2p['subtype'] != P2P_INVITATION_REQ:
+ raise Exception("Unexpected subtype %d" % p2p['subtype'])
+
+ # missing mandatory Channel List attribute (ignored as a workaround)
+ msg = p2p_hdr(dst, src, type=P2P_INVITATION_RESP, dialog_token=p2p['dialog_token'])
+ attrs = p2p_attr_status()
+ msg['payload'] += ie_p2p(attrs)
+ mgmt_tx(dev[1], "MGMT_TX {} {} freq={} wait_time=50 no_cck=1 action={}".format(addr0, addr0, rx_msg['freq'], binascii.hexlify(msg['payload'])))
+
+ ev = dev[0].wait_global_event(["P2P-GROUP-STARTED"], timeout=15);
+ if ev is None:
+ raise Exception("Group was not started")
+
def test_p2p_msg_pd_req(dev, apdev):
"""P2P protocol tests for provision discovery request processing"""
dst, src, hapd, channel = start_p2p(dev, apdev)