def mgmt_tx(self, msg):
t = (msg['fc'], 0) + mac2tuple(msg['da']) + mac2tuple(msg['sa']) + mac2tuple(msg['bssid']) + (0,)
hdr = struct.pack('<HH6B6B6BH', *t)
- if "OK" not in self.request("MGMT_TX " + binascii.hexlify(hdr + msg['payload'])):
+ res = self.request("MGMT_TX " + binascii.hexlify(hdr + msg['payload']).decode())
+ if "OK" not in res:
raise Exception("MGMT_TX command to hostapd failed")
def get_sta(self, addr, info=None, next=False):
hapd1ap.mgmt_tx(msg)
logger.info("No R0KH-ID subelem in FTIE")
- snonce = binascii.hexlify(req['payload'][111:111+32])
+ snonce = binascii.hexlify(req['payload'][111:111+32]).decode()
msg['payload'] = binascii.unhexlify("0602" + addrs + "00003603a1b20137520000" + "00000000000000000000000000000000" + "0000000000000000000000000000000000000000000000000000000000000000" + snonce)
hapd1ap.mgmt_tx(msg)
logger.info("No R0KH-ID subelem mismatch in FTIE")
- snonce = binascii.hexlify(req['payload'][111:111+32])
+ snonce = binascii.hexlify(req['payload'][111:111+32]).decode()
msg['payload'] = binascii.unhexlify("0602" + addrs + "00003603a1b201375e0000" + "00000000000000000000000000000000" + "0000000000000000000000000000000000000000000000000000000000000000" + snonce + "030a11223344556677889900")
hapd1ap.mgmt_tx(msg)
logger.info("No R1KH-ID subelem in FTIE")
- r0khid = binascii.hexlify(req['payload'][145:145+10])
+ r0khid = binascii.hexlify(req['payload'][145:145+10]).decode()
msg['payload'] = binascii.unhexlify("0602" + addrs + "00003603a1b201375e0000" + "00000000000000000000000000000000" + "0000000000000000000000000000000000000000000000000000000000000000" + snonce + "030a" + r0khid)
hapd1ap.mgmt_tx(msg)
logger.info("No RSNE")
- r0khid = binascii.hexlify(req['payload'][145:145+10])
+ r0khid = binascii.hexlify(req['payload'][145:145+10]).decode()
msg['payload'] = binascii.unhexlify("0602" + addrs + "00003603a1b20137660000" + "00000000000000000000000000000000" + "0000000000000000000000000000000000000000000000000000000000000000" + snonce + "030a" + r0khid + "0106000102030405")
hapd1ap.mgmt_tx(msg)
# Too short RRB frame
pkt = ehdr + '\x01'
- if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)):
+ if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt).decode()):
raise Exception("DATA_TEST_FRAME failed")
# RRB discarded frame wikth unrecognized type
pkt = ehdr + '\x02' + '\x02' + '\x01\x00' + _src_ll
- if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)):
+ if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt).decode()):
raise Exception("DATA_TEST_FRAME failed")
# RRB frame too short for action frame
pkt = ehdr + '\x01' + '\x02' + '\x01\x00' + _src_ll
- if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)):
+ if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt).decode()):
raise Exception("DATA_TEST_FRAME failed")
# Too short RRB frame (not enough room for Action Frame body)
pkt = ehdr + '\x01' + '\x02' + '\x00\x00' + _src_ll
- if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)):
+ if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt).decode()):
raise Exception("DATA_TEST_FRAME failed")
# Unexpected Action frame category
pkt = ehdr + '\x01' + '\x02' + '\x0e\x00' + _src_ll + '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
- if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)):
+ if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt).decode()):
raise Exception("DATA_TEST_FRAME failed")
# Unexpected Action in RRB Request
pkt = ehdr + '\x01' + '\x00' + '\x0e\x00' + _src_ll + '\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
- if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)):
+ if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt).decode()):
raise Exception("DATA_TEST_FRAME failed")
# Target AP address in RRB Request does not match with own address
pkt = ehdr + '\x01' + '\x00' + '\x0e\x00' + _src_ll + '\x06\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
- if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)):
+ if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt).decode()):
raise Exception("DATA_TEST_FRAME failed")
# Not enough room for status code in RRB Response
pkt = ehdr + '\x01' + '\x01' + '\x0e\x00' + _src_ll + '\x06\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
- if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)):
+ if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt).decode()):
raise Exception("DATA_TEST_FRAME failed")
# RRB discarded frame with unknown packet_type
pkt = ehdr + '\x01' + '\x02' + '\x0e\x00' + _src_ll + '\x06\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
- if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)):
+ if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt).decode()):
raise Exception("DATA_TEST_FRAME failed")
# RRB Response with non-zero status code; no STA match
pkt = ehdr + '\x01' + '\x01' + '\x10\x00' + _src_ll + '\x06\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + '\xff\xff'
- if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)):
+ if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt).decode()):
raise Exception("DATA_TEST_FRAME failed")
# RRB Response with zero status code and extra data; STA match
pkt = ehdr + '\x01' + '\x01' + '\x11\x00' + _src_ll + '\x06\x01' + _src_ll + '\x00\x00\x00\x00\x00\x00' + '\x00\x00' + '\x00'
- if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)):
+ if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt).decode()):
raise Exception("DATA_TEST_FRAME failed")
# Too short PMK-R1 pull
pkt = ehdr + '\x01' + '\xc8' + '\x0e\x00' + _src_ll + '\x06\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
- if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)):
+ if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt).decode()):
raise Exception("DATA_TEST_FRAME failed")
# Too short PMK-R1 resp
pkt = ehdr + '\x01' + '\xc9' + '\x0e\x00' + _src_ll + '\x06\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
- if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)):
+ if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt).decode()):
raise Exception("DATA_TEST_FRAME failed")
# Too short PMK-R1 push
pkt = ehdr + '\x01' + '\xca' + '\x0e\x00' + _src_ll + '\x06\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
- if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)):
+ if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt).decode()):
raise Exception("DATA_TEST_FRAME failed")
# No matching R0KH address found for PMK-R0 pull response
pkt = ehdr + '\x01' + '\xc9' + '\x5a\x00' + _src_ll + '\x06\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + 76*'\00'
- if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)):
+ if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt).decode()):
raise Exception("DATA_TEST_FRAME failed")
@remote_compatible
dev[0].dump_monitor()
def ie_hex(ies, id):
- return binascii.hexlify(struct.pack('BB', id, len(ies[id])) + ies[id])
+ return binascii.hexlify(struct.pack('BB', id, len(ies[id])) + ies[id]).decode()
def test_ap_ft_reassoc_proto(dev, apdev):
"""WPA2-PSK-FT AP Reassociation Request frame parsing"""
while True:
req = hapd2ap.mgmt_rx()
- hapd2ap.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=" + binascii.hexlify(req['frame']))
+ hapd2ap.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=" + binascii.hexlify(req['frame']).decode())
if req['subtype'] == 11:
break
req = hapd2ap.mgmt_rx()
if req['subtype'] == 2:
break
- hapd2ap.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=" + binascii.hexlify(req['frame']))
+ hapd2ap.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=" + binascii.hexlify(req['frame']).decode())
# IEEE 802.11 header + fixed fields before IEs
- hdr = binascii.hexlify(req['frame'][0:34])
+ hdr = binascii.hexlify(req['frame'][0:34]).decode()
ies = parse_ie(binascii.hexlify(req['frame'][34:]))
# First elements: SSID, Supported Rates, Extended Supported Rates
ies1 = ie_hex(ies, 0) + ie_hex(ies, 1) + ie_hex(ies, 50)
req = hapd2ap.mgmt_rx()
count += 1
hapd2ap.dump_monitor()
- hapd2ap.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=" + binascii.hexlify(req['frame']))
+ hapd2ap.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=" + binascii.hexlify(req['frame']).decode())
if req['subtype'] == 2:
reassocreq = req
ev = hapd2ap.wait_event(["MGMT-TX-STATUS"], timeout=5)
logger.info("Replay the last Reassociation Request frame")
hapd2ap.dump_monitor()
hapd2ap.set("ext_mgmt_frame_handling", "1")
- hapd2ap.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=" + binascii.hexlify(req['frame']))
+ hapd2ap.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=" + binascii.hexlify(req['frame']).decode())
ev = hapd2ap.wait_event(["MGMT-TX-STATUS"], timeout=5)
if ev is None:
raise Exception("No TX status seen")
break
icon += base64.b64decode(res)
pos += 1000
- hex = binascii.hexlify(icon)
+ hex = binascii.hexlify(icon).decode()
if not hex.startswith("0009696d6167652f706e677d1d"):
raise Exception("Unexpected beacon binary header: " + hex)
with open('w1fi_logo.png', 'r') as f:
bss = dev[0].get_bss(bssid)
if "hs20_operator_icon_metadata" not in bss:
raise Exception("hs20_operator_icon_metadata missing from BSS entry")
- if bss["hs20_operator_icon_metadata"] != binascii.hexlify(value):
+ if bss["hs20_operator_icon_metadata"] != binascii.hexlify(value).decode():
print(binascii.hexlify(value))
raise Exception("Unexpected hs20_operator_icon_metadata value: " +
bss["hs20_operator_icon_metadata"])
pkt = build_ns(src_ll=addr0, ip_src="aaaa:bbbb:cccc::2",
ip_dst="ff02::1:ff00:2", target="aaaa:bbbb:cccc::2",
opt=src_ll_opt0)
- if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)):
+ if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt).decode()):
raise Exception("DATA_TEST_FRAME failed")
pkt = build_ns(src_ll=addr1, ip_src="aaaa:bbbb:dddd::2",
ip_dst="ff02::1:ff00:2", target="aaaa:bbbb:dddd::2",
opt=src_ll_opt1)
- if "OK" not in dev[1].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)):
+ if "OK" not in dev[1].request("DATA_TEST_FRAME " + binascii.hexlify(pkt).decode()):
raise Exception("DATA_TEST_FRAME failed")
pkt = build_ns(src_ll=addr1, ip_src="aaaa:bbbb:eeee::2",
ip_dst="ff02::1:ff00:2", target="aaaa:bbbb:eeee::2",
opt=src_ll_opt1)
- if "OK" not in dev[1].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)):
+ if "OK" not in dev[1].request("DATA_TEST_FRAME " + binascii.hexlify(pkt).decode()):
raise Exception("DATA_TEST_FRAME failed")
matches = get_permanent_neighbors("ap-br0")
pkt = build_ns(src_ll=addr0, ip_src="aaaa:bbbb:cccc::2",
ip_dst="ff02::1:ff00:2", target="aaaa:bbbb:cccc::2",
opt=src_ll_opt0)
- if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)):
+ if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt).decode()):
raise Exception("DATA_TEST_FRAME failed")
pkt = build_ra(src_ll=apdev[0]['bssid'], ip_src="aaaa:bbbb:cccc::33",
ip_dst="ff01::1")
- if "OK" not in hapd.request("DATA_TEST_FRAME ifname=ap-br0 " + binascii.hexlify(pkt)):
+ if "OK" not in hapd.request("DATA_TEST_FRAME ifname=ap-br0 " + binascii.hexlify(pkt).decode()):
raise Exception("DATA_TEST_FRAME failed")
pkt = build_na(src_ll=apdev[0]['bssid'], ip_src="aaaa:bbbb:cccc::44",
ip_dst="ff01::1", target="aaaa:bbbb:cccc::55")
- if "OK" not in hapd.request("DATA_TEST_FRAME ifname=ap-br0 " + binascii.hexlify(pkt)):
+ if "OK" not in hapd.request("DATA_TEST_FRAME ifname=ap-br0 " + binascii.hexlify(pkt).decode()):
raise Exception("DATA_TEST_FRAME failed")
pkt = build_dhcp_ack(dst_ll="ff:ff:ff:ff:ff:ff", src_ll=bssid,
ip_src="192.168.1.1", ip_dst="255.255.255.255",
yiaddr="192.168.1.123", chaddr=addr0)
- if "OK" not in hapd.request("DATA_TEST_FRAME ifname=ap-br0 " + binascii.hexlify(pkt)):
+ if "OK" not in hapd.request("DATA_TEST_FRAME ifname=ap-br0 " + binascii.hexlify(pkt).decode()):
raise Exception("DATA_TEST_FRAME failed")
# another copy for additional code coverage
pkt = build_dhcp_ack(dst_ll=addr0, src_ll=bssid,
ip_src="192.168.1.1", ip_dst="255.255.255.255",
yiaddr="192.168.1.123", chaddr=addr0)
- if "OK" not in hapd.request("DATA_TEST_FRAME ifname=ap-br0 " + binascii.hexlify(pkt)):
+ if "OK" not in hapd.request("DATA_TEST_FRAME ifname=ap-br0 " + binascii.hexlify(pkt).decode()):
raise Exception("DATA_TEST_FRAME failed")
matches = get_permanent_neighbors("ap-br0")
pkt = build_ns(src_ll=src_ll, ip_src=ip_src, ip_dst=ip_dst, target=target,
opt=opt)
- if "OK" not in dev.request(cmd + binascii.hexlify(pkt)):
+ if "OK" not in dev.request(cmd + binascii.hexlify(pkt).decode()):
raise Exception("DATA_TEST_FRAME failed")
def build_na(src_ll, ip_src, ip_dst, target, opt=None, flags=0):
pkt = build_na(src_ll=src_ll, ip_src=ip_src, ip_dst=ip_dst, target=target,
opt=opt)
- if "OK" not in dev.request(cmd + binascii.hexlify(pkt)):
+ if "OK" not in dev.request(cmd + binascii.hexlify(pkt).decode()):
raise Exception("DATA_TEST_FRAME failed")
def build_dhcp_ack(dst_ll, src_ll, ip_src, ip_dst, yiaddr, chaddr,
pkt = build_arp(dst_ll=dst_ll, src_ll=src_ll, opcode=opcode,
sender_mac=sender_mac, sender_ip=sender_ip,
target_mac=target_mac, target_ip=target_ip)
- if "OK" not in dev.request(cmd + binascii.hexlify(pkt)):
+ if "OK" not in dev.request(cmd + binascii.hexlify(pkt).decode()):
raise Exception("DATA_TEST_FRAME failed")
def get_permanent_neighbors(ifname):
pkt = build_dhcp_ack(dst_ll="ff:ff:ff:ff:ff:ff", src_ll=bssid,
ip_src="192.168.1.1", ip_dst="255.255.255.255",
yiaddr="192.168.1.124", chaddr=addr0)
- if "OK" not in hapd.request("DATA_TEST_FRAME ifname=ap-br0 " + binascii.hexlify(pkt)):
+ if "OK" not in hapd.request("DATA_TEST_FRAME ifname=ap-br0 " + binascii.hexlify(pkt).decode()):
raise Exception("DATA_TEST_FRAME failed")
# Change address and verify unicast
pkt = build_dhcp_ack(dst_ll=addr0, src_ll=bssid,
ip_src="192.168.1.1", ip_dst="255.255.255.255",
yiaddr="192.168.1.123", chaddr=addr0,
udp_checksum=False)
- if "OK" not in hapd.request("DATA_TEST_FRAME ifname=ap-br0 " + binascii.hexlify(pkt)):
+ if "OK" not in hapd.request("DATA_TEST_FRAME ifname=ap-br0 " + binascii.hexlify(pkt).decode()):
raise Exception("DATA_TEST_FRAME failed")
# Not-associated client MAC address
pkt = build_dhcp_ack(dst_ll="ff:ff:ff:ff:ff:ff", src_ll=bssid,
ip_src="192.168.1.1", ip_dst="255.255.255.255",
yiaddr="192.168.1.125", chaddr="22:33:44:55:66:77")
- if "OK" not in hapd.request("DATA_TEST_FRAME ifname=ap-br0 " + binascii.hexlify(pkt)):
+ if "OK" not in hapd.request("DATA_TEST_FRAME ifname=ap-br0 " + binascii.hexlify(pkt).decode()):
raise Exception("DATA_TEST_FRAME failed")
# No IP address
pkt = build_dhcp_ack(dst_ll=addr1, src_ll=bssid,
ip_src="192.168.1.1", ip_dst="255.255.255.255",
yiaddr="0.0.0.0", chaddr=addr1)
- if "OK" not in hapd.request("DATA_TEST_FRAME ifname=ap-br0 " + binascii.hexlify(pkt)):
+ if "OK" not in hapd.request("DATA_TEST_FRAME ifname=ap-br0 " + binascii.hexlify(pkt).decode()):
raise Exception("DATA_TEST_FRAME failed")
# Zero subnet mask
ip_src="192.168.1.1", ip_dst="255.255.255.255",
yiaddr="192.168.1.126", chaddr=addr1,
subnet_mask="0.0.0.0")
- if "OK" not in hapd.request("DATA_TEST_FRAME ifname=ap-br0 " + binascii.hexlify(pkt)):
+ if "OK" not in hapd.request("DATA_TEST_FRAME ifname=ap-br0 " + binascii.hexlify(pkt).decode()):
raise Exception("DATA_TEST_FRAME failed")
# Truncated option
ip_src="192.168.1.1", ip_dst="255.255.255.255",
yiaddr="192.168.1.127", chaddr=addr1,
truncated_opt=True)
- if "OK" not in hapd.request("DATA_TEST_FRAME ifname=ap-br0 " + binascii.hexlify(pkt)):
+ if "OK" not in hapd.request("DATA_TEST_FRAME ifname=ap-br0 " + binascii.hexlify(pkt).decode()):
raise Exception("DATA_TEST_FRAME failed")
# Wrong magic
ip_src="192.168.1.1", ip_dst="255.255.255.255",
yiaddr="192.168.1.128", chaddr=addr1,
wrong_magic=True)
- if "OK" not in hapd.request("DATA_TEST_FRAME ifname=ap-br0 " + binascii.hexlify(pkt)):
+ if "OK" not in hapd.request("DATA_TEST_FRAME ifname=ap-br0 " + binascii.hexlify(pkt).decode()):
raise Exception("DATA_TEST_FRAME failed")
# Wrong IPv4 total length
ip_src="192.168.1.1", ip_dst="255.255.255.255",
yiaddr="192.168.1.129", chaddr=addr1,
force_tot_len=1000)
- if "OK" not in hapd.request("DATA_TEST_FRAME ifname=ap-br0 " + binascii.hexlify(pkt)):
+ if "OK" not in hapd.request("DATA_TEST_FRAME ifname=ap-br0 " + binascii.hexlify(pkt).decode()):
raise Exception("DATA_TEST_FRAME failed")
# BOOTP
ip_src="192.168.1.1", ip_dst="255.255.255.255",
yiaddr="192.168.1.129", chaddr=addr1,
no_dhcp=True)
- if "OK" not in hapd.request("DATA_TEST_FRAME ifname=ap-br0 " + binascii.hexlify(pkt)):
+ if "OK" not in hapd.request("DATA_TEST_FRAME ifname=ap-br0 " + binascii.hexlify(pkt).decode()):
raise Exception("DATA_TEST_FRAME failed")
macs = get_bridge_macs("ap-br0")
pkt = build_ra(src_ll=apdev[0]['bssid'], ip_src="aaaa:bbbb:cccc::33",
ip_dst="ff01::1")
with fail_test(hapd, 1, "x_snoop_mcast_to_ucast_convert_send"):
- if "OK" not in hapd.request("DATA_TEST_FRAME ifname=ap-br0 " + binascii.hexlify(pkt)):
+ if "OK" not in hapd.request("DATA_TEST_FRAME ifname=ap-br0 " + binascii.hexlify(pkt).decode()):
raise Exception("DATA_TEST_FRAME failed")
wait_fail_trigger(dev[0], "GET_FAIL")
pkt = build_ns(src_ll=addr0, ip_src="aaaa:bbbb:cccc::2",
ip_dst="ff02::1:ff00:2", target="aaaa:bbbb:cccc::2",
opt=src_ll_opt0)
- if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)):
+ if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt).decode()):
raise Exception("DATA_TEST_FRAME failed")
wait_fail_trigger(dev[0], "GET_ALLOC_FAIL")
return parse_eapol(eapol)
def send_eapol(hapd, addr, data):
- res = hapd.request("EAPOL_RX " + addr + " " + binascii.hexlify(data))
+ res = hapd.request("EAPOL_RX " + addr + " " + binascii.hexlify(data).decode())
if "OK" not in res:
raise Exception("EAPOL_RX to hostapd failed")
return msg
def send_wsc_msg(dev, src, msg):
- res = dev.request("EAPOL_RX " + src + " " + binascii.hexlify(msg))
+ res = dev.request("EAPOL_RX " + src + " " + binascii.hexlify(msg).decode())
if "OK" not in res:
raise Exception("EAPOL_RX failed")
logger.debug("DH shared secret: " + ss)
dhkey = hashlib.sha256(binascii.unhexlify(ss)).digest()
- logger.debug("DHKey: " + binascii.hexlify(dhkey))
+ logger.debug("DHKey: " + binascii.hexlify(dhkey).decode())
m = hmac.new(dhkey, e_nonce + mac_addr + r_nonce, hashlib.sha256)
kdk = m.digest()
- logger.debug("KDK: " + binascii.hexlify(kdk))
+ logger.debug("KDK: " + binascii.hexlify(kdk).decode())
authkey,keywrapkey,emsk = wsc_keys(kdk)
- logger.debug("AuthKey: " + binascii.hexlify(authkey))
- logger.debug("KeyWrapKey: " + binascii.hexlify(keywrapkey))
- logger.debug("EMSK: " + binascii.hexlify(emsk))
+ logger.debug("AuthKey: " + binascii.hexlify(authkey).decode())
+ logger.debug("KeyWrapKey: " + binascii.hexlify(keywrapkey).decode())
+ logger.debug("EMSK: " + binascii.hexlify(emsk).decode())
return authkey,keywrapkey
def wsc_dev_pw_hash(authkey, dev_pw, e_pk, r_pk):
psk1,psk2 = wsc_dev_pw_psk(authkey, dev_pw)
- logger.debug("PSK1: " + binascii.hexlify(psk1))
- logger.debug("PSK2: " + binascii.hexlify(psk2))
+ logger.debug("PSK1: " + binascii.hexlify(psk1).decode())
+ logger.debug("PSK2: " + binascii.hexlify(psk2).decode())
# Note: Secret values are supposed to be random, but hardcoded values are
# fine for testing.
s1 = 16*'\x77'
m = hmac.new(authkey, s1 + psk1 + e_pk + r_pk, hashlib.sha256)
hash1 = m.digest()
- logger.debug("Hash1: " + binascii.hexlify(hash1))
+ logger.debug("Hash1: " + binascii.hexlify(hash1).decode())
s2 = 16*'\x88'
m = hmac.new(authkey, s2 + psk2 + e_pk + r_pk, hashlib.sha256)
hash2 = m.digest()
- logger.debug("Hash2: " + binascii.hexlify(hash2))
+ logger.debug("Hash2: " + binascii.hexlify(hash2).decode())
return s1,s2,hash1,hash2
def build_m1(eap_id, uuid_e, mac_addr, e_nonce, e_pk,
msg, m8_attrs, raw_m8_attrs = recv_wsc_msg(hapd, WSC_MSG, WPS_M8)
m8_cred = decrypt_attr_encr_settings(authkey, keywrapkey,
m8_attrs[ATTR_ENCR_SETTINGS])
- logger.debug("M8 Credential: " + binascii.hexlify(m8_cred))
+ logger.debug("M8 Credential: " + binascii.hexlify(m8_cred).decode())
logger.debug("Prepare WSC_Done")
attrs = build_wsc_attr(ATTR_VERSION, '\x10')
def nl80211_command(dev, cmd, attr):
res = dev.request("VENDOR ffffffff {} {}".format(nl80211_cmd[cmd],
- binascii.hexlify(attr)))
+ binascii.hexlify(attr).decode()))
if "FAIL" in res:
raise Exception("nl80211 command failed")
return binascii.unhexlify(res)
logger.debug("MeshPeers: " + str(res))
if len(res) != 1:
raise Exception("Unexpected number of MeshPeer values")
- if binascii.hexlify(res[0]) != addr1.replace(':', ''):
+ if binascii.hexlify(res[0]).decode() != addr1.replace(':', ''):
raise Exception("Unexpected peer address")
res = if_obj.Get(WPAS_DBUS_IFACE_MESH, 'MeshGroup',
"wpa_supplicant EAP-FAST PAC file - version 1\nSTART\nI-ID=1\nEND\n",
"wpa_supplicant EAP-FAST PAC file - version 1\nSTART\nA-ID-Info=1\nEND\n" ]
for pac in tests:
- blob = binascii.hexlify(pac)
+ blob = binascii.hexlify(pac).decode()
dev[0].request("SET blob fast_pac_errors " + blob)
dev[0].connect("eap-test", key_mgmt="WPA-EAP", scan_freq="2412",
eap="FAST", anonymous_identity="FAST",
tests = [ "wpa_supplicant EAP-FAST PAC file - version 1\nSTART\nEND\n",
"wpa_supplicant EAP-FAST PAC file - version 1\nSTART\nEND\nSTART\nEND\nSTART\nEND\n" ]
for pac in tests:
- blob = binascii.hexlify(pac)
+ blob = binascii.hexlify(pac).decode()
dev[0].request("SET blob fast_pac_errors " + blob)
dev[0].connect("eap-test", key_mgmt="WPA-EAP", scan_freq="2412",
eap="FAST", anonymous_identity="FAST",
count = 0
for realm in fils_realms:
hash = hashlib.sha256(realm.lower()).digest()
- expected += binascii.hexlify(hash[0:2])
+ expected += binascii.hexlify(hash[0:2]).decode()
count += 1
if count == 7:
break
expected = ''
for realm in fils_realms:
hash = hashlib.sha256(realm.lower()).digest()
- expected += binascii.hexlify(hash[0:2])
+ expected += binascii.hexlify(hash[0:2]).decode()
if info != expected:
raise Exception("Unexpected FILS Realm Info ANQP-element: " + info)
tests = [ "ff:ff:ff:ff:ff:ff aabb",
"ff:ff:ff:ff:ff:ff " + 255*'cc',
hapd.own_addr() + " ddee010203040506070809",
- "ff:ff:ff:ff:ff:ff " + binascii.hexlify(dhcpdisc) ]
+ "ff:ff:ff:ff:ff:ff " + binascii.hexlify(dhcpdisc).decode() ]
for t in tests:
if "OK" not in dev[0].request("FILS_HLP_REQ_ADD " + t):
raise Exception("FILS_HLP_REQ_ADD failed: " + t)
raise Exception("Failed to flush pending FILS HLP requests")
dhcpdisc = build_dhcp(req=True, dhcp_msg=DHCPDISCOVER,
chaddr=dev[0].own_addr())
- if "OK" not in dev[0].request("FILS_HLP_REQ_ADD " + "ff:ff:ff:ff:ff:ff " + binascii.hexlify(dhcpdisc)):
+ if "OK" not in dev[0].request("FILS_HLP_REQ_ADD " + "ff:ff:ff:ff:ff:ff " + binascii.hexlify(dhcpdisc).decode()):
raise Exception("FILS_HLP_REQ_ADD failed")
id = dev[0].connect("fils", key_mgmt="FILS-SHA256",
eap="PSK", identity="psk.user@example.com",
raise Exception("Failed to flush pending FILS HLP requests")
dhcpdisc = build_dhcp(req=True, dhcp_msg=DHCPDISCOVER,
chaddr=dev[0].own_addr())
- if "OK" not in dev[0].request("FILS_HLP_REQ_ADD " + "ff:ff:ff:ff:ff:ff " + binascii.hexlify(dhcpdisc)):
+ if "OK" not in dev[0].request("FILS_HLP_REQ_ADD " + "ff:ff:ff:ff:ff:ff " + binascii.hexlify(dhcpdisc).decode()):
raise Exception("FILS_HLP_REQ_ADD failed")
id = dev[0].connect("fils", key_mgmt="FILS-SHA256",
eap="PSK", identity="psk.user@example.com",
tests = [ "ff",
"0800",
"0800" + 20*"00",
- "0800" + binascii.hexlify(ipv4_overflow),
- "0800" + binascii.hexlify(ipv4_unknown_proto),
- "0800" + binascii.hexlify(ipv4_missing_udp_hdr),
- "0800" + binascii.hexlify(udp_overflow),
- "0800" + binascii.hexlify(udp_underflow),
- "0800" + binascii.hexlify(udp_unknown_port),
- "0800" + binascii.hexlify(dhcp_missing_data),
- binascii.hexlify(dhcp_not_req),
- binascii.hexlify(dhcp_no_magic),
- binascii.hexlify(dhcp_unknown_magic) ]
+ "0800" + binascii.hexlify(ipv4_overflow).decode(),
+ "0800" + binascii.hexlify(ipv4_unknown_proto).decode(),
+ "0800" + binascii.hexlify(ipv4_missing_udp_hdr).decode(),
+ "0800" + binascii.hexlify(udp_overflow).decode(),
+ "0800" + binascii.hexlify(udp_underflow).decode(),
+ "0800" + binascii.hexlify(udp_unknown_port).decode(),
+ "0800" + binascii.hexlify(dhcp_missing_data).decode(),
+ binascii.hexlify(dhcp_not_req).decode(),
+ binascii.hexlify(dhcp_no_magic).decode(),
+ binascii.hexlify(dhcp_unknown_magic).decode() ]
for t in tests:
if "OK" not in dev[0].request("FILS_HLP_REQ_ADD ff:ff:ff:ff:ff:ff " + t):
raise Exception("FILS_HLP_REQ_ADD failed: " + t)
dev[0].wait_disconnected()
dev[0].request("FILS_HLP_REQ_FLUSH")
- tests = [ binascii.hexlify(dhcp_opts),
- binascii.hexlify(dhcp_opts2) ]
+ tests = [ binascii.hexlify(dhcp_opts).decode(),
+ binascii.hexlify(dhcp_opts2).decode() ]
for t in tests:
if "OK" not in dev[0].request("FILS_HLP_REQ_ADD ff:ff:ff:ff:ff:ff " + t):
raise Exception("FILS_HLP_REQ_ADD failed: " + t)
dev[0].wait_disconnected()
dev[0].request("FILS_HLP_REQ_FLUSH")
- if "OK" not in dev[0].request("FILS_HLP_REQ_ADD ff:ff:ff:ff:ff:ff " + binascii.hexlify(dhcp_valid)):
+ if "OK" not in dev[0].request("FILS_HLP_REQ_ADD ff:ff:ff:ff:ff:ff " + binascii.hexlify(dhcp_valid).decode()):
raise Exception("FILS_HLP_REQ_ADD failed")
hapd.set("own_ip_addr", "0.0.0.0")
dev[0].select_network(id, freq=2412)
raise Exception("Failed to flush pending FILS HLP requests")
dhcpdisc = build_dhcp(req=True, dhcp_msg=DHCPDISCOVER,
chaddr=dev[0].own_addr())
- if "OK" not in dev[0].request("FILS_HLP_REQ_ADD " + "ff:ff:ff:ff:ff:ff " + binascii.hexlify(dhcpdisc)):
+ if "OK" not in dev[0].request("FILS_HLP_REQ_ADD " + "ff:ff:ff:ff:ff:ff " + binascii.hexlify(dhcpdisc).decode()):
raise Exception("FILS_HLP_REQ_ADD failed")
id = dev[0].connect("fils", key_mgmt="FILS-SHA256",
eap="PSK", identity="psk.user@example.com",
dhcpdisc = build_dhcp(req=True, dhcp_msg=DHCPDISCOVER,
chaddr=dev[0].own_addr(),
extra_op="\x00\x11", opt_end=False)
- if "OK" not in dev[0].request("FILS_HLP_REQ_ADD " + "ff:ff:ff:ff:ff:ff " + binascii.hexlify(dhcpdisc)):
+ if "OK" not in dev[0].request("FILS_HLP_REQ_ADD " + "ff:ff:ff:ff:ff:ff " + binascii.hexlify(dhcpdisc).decode()):
raise Exception("FILS_HLP_REQ_ADD failed")
dev[0].dump_monitor()
dev[0].select_network(id, freq=2412)
dhcpdisc = build_dhcp(req=True, dhcp_msg=DHCPDISCOVER,
chaddr=dev[0].own_addr(),
extra_op="\x11\x01", opt_end=False)
- if "OK" not in dev[0].request("FILS_HLP_REQ_ADD " + "ff:ff:ff:ff:ff:ff " + binascii.hexlify(dhcpdisc)):
+ if "OK" not in dev[0].request("FILS_HLP_REQ_ADD " + "ff:ff:ff:ff:ff:ff " + binascii.hexlify(dhcpdisc).decode()):
raise Exception("FILS_HLP_REQ_ADD failed")
dev[0].dump_monitor()
dev[0].select_network(id, freq=2412)
req = hapd.mgmt_rx()
count += 1
hapd.dump_monitor()
- hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=" + binascii.hexlify(req['frame']))
+ hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=" + binascii.hexlify(req['frame']).decode())
if req['subtype'] == 0:
assocreq = req
ev = hapd.wait_event(["MGMT-TX-STATUS"], timeout=5)
logger.info("Replay the last Association Request frame")
hapd.dump_monitor()
hapd.set("ext_mgmt_frame_handling", "1")
- hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=" + binascii.hexlify(req['frame']))
+ hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=" + binascii.hexlify(req['frame']).decode())
ev = hapd.wait_event(["MGMT-TX-STATUS"], timeout=5)
if ev is None:
raise Exception("No TX status seen")
for dialog_token in range(1, 10):
msg = struct.pack('<BBB', ACTION_CATEG_PUBLIC, GAS_INITIAL_REQUEST,
dialog_token) + anqp_adv_proto() + gas
- req = "MGMT_TX {} {} freq=2412 wait_time=10 action={}".format(bssid, bssid, binascii.hexlify(msg))
+ req = "MGMT_TX {} {} freq=2412 wait_time=10 action={}".format(bssid, bssid, binascii.hexlify(msg).decode())
if "OK" not in wpas.request(req):
raise Exception("Could not send management frame")
resp = wpas.mgmt_rx()
raise Exception("Failed to enable external management frame handling")
msg = struct.pack('<BBB', ACTION_CATEG_PUBLIC, GAS_COMEBACK_REQUEST, 1)
- req = "MGMT_TX {} {} freq=2412 wait_time=10 action={}".format(bssid, bssid, binascii.hexlify(msg))
+ req = "MGMT_TX {} {} freq=2412 wait_time=10 action={}".format(bssid, bssid, binascii.hexlify(msg).decode())
if "OK" not in wpas.request(req):
raise Exception("Could not send management frame")
resp = wpas.mgmt_rx()
held = struct.pack('BBB', 0, 1 + len(held_uri), 1) + held_uri
supl_fqdn = "supl.example.com"
supl = struct.pack('BBB', 0, 1 + len(supl_fqdn), 1) + supl_fqdn
- public_id = binascii.hexlify(held + supl)
+ public_id = binascii.hexlify(held + supl).decode()
params = { "ssid": "gas/anqp",
"interworking": "1",
"anqp_elem": [ "265:" + geo_loc,
name2 = "Esimerkkipaikka"
venue1 = struct.pack('B', len(lang1 + name1)) + lang1 + name1
venue2 = struct.pack('B', len(lang2 + name2)) + lang2 + name2
- venue_name = binascii.hexlify(venue_info + venue1 + venue2)
+ venue_name = binascii.hexlify(venue_info + venue1 + venue2).decode()
url1 = "http://example.com/venue"
url2 = "https://example.org/venue-info/"
duple1 = struct.pack('BB', 1 + len(url1), 1) + url1
duple2 = struct.pack('BB', 1 + len(url2), 2) + url2
- venue_url = binascii.hexlify(duple1 + duple2)
+ venue_url = binascii.hexlify(duple1 + duple2).decode()
params = { "ssid": "gas/anqp",
"interworking": "1",
if 'anqp_capability_list' not in bss:
raise Exception("Capability List ANQP-element not seen")
ids = struct.pack('<HHH', 257, 258, 277)
- if not bss['anqp_capability_list'].startswith(binascii.hexlify(ids)):
+ if not bss['anqp_capability_list'].startswith(binascii.hexlify(ids).decode()):
raise Exception("Unexpected Capability List ANQP-element value: " + bss['anqp_capability_list'])
def test_gas_anqp_venue_url2(dev, apdev):
name2 = "Esimerkkipaikka"
venue1 = struct.pack('B', len(lang1 + name1)) + lang1 + name1
venue2 = struct.pack('B', len(lang2 + name2)) + lang2 + name2
- venue_name = binascii.hexlify(venue_info + venue1 + venue2)
+ venue_name = binascii.hexlify(venue_info + venue1 + venue2).decode()
url1 = "http://example.com/venue"
url2 = "https://example.org/venue-info/"
duple1 = struct.pack('BB', 1 + len(url1), 1) + url1
duple2 = struct.pack('BB', 1 + len(url2), 2) + url2
- venue_url = binascii.hexlify(duple1 + duple2)
+ venue_url = binascii.hexlify(duple1 + duple2).decode()
params = { "ssid": "gas/anqp",
"interworking": "1",
if 'anqp_capability_list' not in bss:
raise Exception("Capability List ANQP-element not seen")
ids = struct.pack('<HHH', 257, 258, 277)
- if not bss['anqp_capability_list'].startswith(binascii.hexlify(ids)):
+ if not bss['anqp_capability_list'].startswith(binascii.hexlify(ids).decode()):
raise Exception("Unexpected Capability List ANQP-element value: " + bss['anqp_capability_list'])
def test_gas_anqp_venue_url_pmf(dev, apdev):
raise Exception("Failed to enable external management frame handling")
msg = struct.pack('<BBB', ACTION_CATEG_PUBLIC, GAS_COMEBACK_REQUEST, 1)
- req = "MGMT_TX {} {} freq=2412 wait_time=10 action={}".format(bssid, bssid, binascii.hexlify(msg))
+ req = "MGMT_TX {} {} freq=2412 wait_time=10 action={}".format(bssid, bssid, binascii.hexlify(msg).decode())
with alloc_fail(hapd, 1,
"gas_anqp_build_comeback_resp_buf;gas_serv_rx_gas_comeback_req"):
if "OK" not in wpas.request(req):
dialog_token = 100
msg = struct.pack('<BBB', ACTION_CATEG_PUBLIC, GAS_INITIAL_REQUEST,
dialog_token) + anqp_adv_proto() + gas
- req = "MGMT_TX {} {} freq=2412 wait_time=10 action={}".format(bssid, bssid, binascii.hexlify(msg))
+ req = "MGMT_TX {} {} freq=2412 wait_time=10 action={}".format(bssid, bssid, binascii.hexlify(msg).decode())
if "OK" not in wpas.request(req):
raise Exception("Could not send management frame")
resp = wpas.mgmt_rx()
msg = struct.pack('<BBB', ACTION_CATEG_PUBLIC, GAS_COMEBACK_REQUEST,
dialog_token + 1)
- req = "MGMT_TX {} {} freq=2412 wait_time=10 action={}".format(bssid, bssid, binascii.hexlify(msg))
+ req = "MGMT_TX {} {} freq=2412 wait_time=10 action={}".format(bssid, bssid, binascii.hexlify(msg).decode())
if "OK" not in wpas.request(req):
raise Exception("Could not send management frame")
resp = wpas.mgmt_rx()
msg6 = struct.pack('<BB', ACTION_CATEG_PUBLIC, GAS_COMEBACK_REQUEST)
tests = [ msg, msg2, msg3, msg4, msg5, msg6 ]
for t in tests:
- req = "MGMT_TX {} {} freq=2412 wait_time=10 action={}".format(bssid, bssid, binascii.hexlify(t))
+ req = "MGMT_TX {} {} freq=2412 wait_time=10 action={}".format(bssid, bssid, binascii.hexlify(t).decode())
if "OK" not in wpas.request(req):
raise Exception("Could not send management frame")
ev = wpas.wait_event(["MGMT-TX-STATUS"], timeout=5)
ap_addr = hapd.own_addr()
cl_addr = dev.own_addr()
pkt = build_arp(cl_addr, ap_addr, 2, ap_addr, '10.0.0.1', ap_addr, '10.0.0.1')
- pkt = binascii.hexlify(pkt)
+ pkt = binascii.hexlify(pkt).decode()
if "OK" not in hapd.request('DATA_TEST_FRAME ' + pkt):
raise Exception("DATA_TEST_FRAME failed")
cl_addr = dev.own_addr()
pkt = build_na(ap_addr, 'fdaa::2', 'ff02::1', 'fdaa::2', flags=0x20,
opt=binascii.unhexlify('0201' + ap_addr.replace(':', '')))
- pkt = binascii.hexlify(pkt)
+ pkt = binascii.hexlify(pkt).decode()
if "OK" not in hapd.request('DATA_TEST_FRAME ' + pkt):
raise Exception("DATA_TEST_FRAME failed")
# IEEE 802.1X tests
-# Copyright (c) 2013-2015, Jouni Malinen <j@w1.fi>
+# Copyright (c) 2013-2019, Jouni Malinen <j@w1.fi>
#
# This software may be distributed under the terms of the BSD license.
# See README for more details.
hmac_obj = hmac.new(binascii.unhexlify(signkey))
hmac_obj.update(binascii.unhexlify(frame))
sign = hmac_obj.digest()
- frame = frame_start + binascii.hexlify(sign) + frame_end
+ frame = frame_start + binascii.hexlify(sign).decode() + frame_end
dev.request("EAPOL_RX " + bssid + " " + frame)
def test_ieee8021x_eapol_key(dev, apdev):
break
else:
hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % (
- binascii.hexlify(pkt['frame']), ))
+ binascii.hexlify(pkt['frame']).decode(), ))
hapd.set("ext_mgmt_frame_handling", "0")
hapd.request("STOP_AP")
attrs += p2p_attr_operating_channel()
wsc_attrs = struct.pack(">HHH", 0x1012, 2, 4)
msg['payload'] += ie_p2p(attrs) + ie_wsc(wsc_attrs)
- mgmt_tx(lower, "MGMT_TX {} {} freq={} wait_time=200 no_cck=1 action={}".format(addr_high, addr_high, peer['listen_freq'], binascii.hexlify(msg['payload'])))
+ mgmt_tx(lower, "MGMT_TX {} {} freq={} wait_time=200 no_cck=1 action={}".format(addr_high, addr_high, peer['listen_freq'], binascii.hexlify(msg['payload']).decode()))
# Wait for the GO Negotiation Response frame which would have been sent in
# this case previously, but not anymore after the check for
attrs += p2p_attr_device_info(addr1, config_methods=0x0108)
msg['payload'] += ie_p2p(attrs)
- mgmt_tx(dev[1], "MGMT_TX {} {} freq={} wait_time=200 no_cck=1 action={}".format(addr0, addr0, peer['listen_freq'], binascii.hexlify(msg['payload'])))
+ mgmt_tx(dev[1], "MGMT_TX {} {} freq={} wait_time=200 no_cck=1 action={}".format(addr0, addr0, peer['listen_freq'], binascii.hexlify(msg['payload']).decode()))
rx_msg = dev[1].mgmt_rx()
if rx_msg is None:
attrs += p2p_attr_device_info(addr1, config_methods=0x0108)
msg['payload'] += ie_p2p(attrs)
- mgmt_tx(dev[1], "MGMT_TX {} {} freq={} wait_time=200 no_cck=1 action={}".format(addr0, addr0, peer['listen_freq'], binascii.hexlify(msg['payload'])))
+ mgmt_tx(dev[1], "MGMT_TX {} {} freq={} wait_time=200 no_cck=1 action={}".format(addr0, addr0, peer['listen_freq'], binascii.hexlify(msg['payload']).decode()))
rx_msg = dev[1].mgmt_rx()
if rx_msg is None:
msg = p2p_hdr(dst, src, type=P2P_INVITATION_RESP, dialog_token=2)
attrs = p2p_attr_status()
msg['payload'] += ie_p2p(attrs)
- mgmt_tx(dev[1], "MGMT_TX {} {} freq={} wait_time=200 no_cck=1 action={}".format(addr0, addr0, peer['listen_freq'], binascii.hexlify(msg['payload'])))
+ mgmt_tx(dev[1], "MGMT_TX {} {} freq={} wait_time=200 no_cck=1 action={}".format(
+ addr0, addr0, peer['listen_freq'], binascii.hexlify(msg['payload']).decode()))
time.sleep(0.25)
if "FAIL" in dev[1].request("SET ext_mgmt_frame_handling 1"):
msg = p2p_hdr(dst, src, type=P2P_INVITATION_RESP, dialog_token=p2p['dialog_token'])
attrs = struct.pack("<BB", P2P_ATTR_CAPABILITY, 0)
msg['payload'] += ie_p2p(attrs)
- mgmt_tx(dev[1], "MGMT_TX {} {} freq={} wait_time=200 no_cck=1 action={}".format(addr0, addr0, rx_msg['freq'], binascii.hexlify(msg['payload'])))
+ mgmt_tx(dev[1], "MGMT_TX {} {} freq={} wait_time=200 no_cck=1 action={}".format(
+ addr0, addr0, rx_msg['freq'], binascii.hexlify(msg['payload']).decode()))
invite(dev[0], dev[1])
rx_msg = dev[1].mgmt_rx()
msg = p2p_hdr(dst, src, type=P2P_INVITATION_RESP, dialog_token=p2p['dialog_token'])
attrs = p2p_attr_channel_list()
msg['payload'] += ie_p2p(attrs)
- mgmt_tx(dev[1], "MGMT_TX {} {} freq={} wait_time=200 no_cck=1 action={}".format(addr0, addr0, rx_msg['freq'], binascii.hexlify(msg['payload'])))
+ mgmt_tx(dev[1], "MGMT_TX {} {} freq={} wait_time=200 no_cck=1 action={}".format(
+ addr0, addr0, rx_msg['freq'], binascii.hexlify(msg['payload']).decode()))
invite(dev[0], dev[1])
rx_msg = dev[1].mgmt_rx()
0x58, 0x58, 0x04,
81, 1, 15)
msg['payload'] += ie_p2p(attrs)
- mgmt_tx(dev[1], "MGMT_TX {} {} freq={} wait_time=200 no_cck=1 action={}".format(addr0, addr0, rx_msg['freq'], binascii.hexlify(msg['payload'])))
+ mgmt_tx(dev[1], "MGMT_TX {} {} freq={} wait_time=200 no_cck=1 action={}".format(
+ addr0, addr0, rx_msg['freq'], binascii.hexlify(msg['payload']).decode()))
invite(dev[0], dev[1])
rx_msg = dev[1].mgmt_rx()
0x58, 0x58, 0x04,
81, 1, 12)
msg['payload'] += ie_p2p(attrs)
- mgmt_tx(dev[1], "MGMT_TX {} {} freq={} wait_time=200 no_cck=1 action={}".format(addr0, addr0, rx_msg['freq'], binascii.hexlify(msg['payload'])))
+ mgmt_tx(dev[1], "MGMT_TX {} {} freq={} wait_time=200 no_cck=1 action={}".format(
+ addr0, addr0, rx_msg['freq'], binascii.hexlify(msg['payload']).decode()))
invite(dev[0], dev[1])
rx_msg = dev[1].mgmt_rx()
msg = p2p_hdr(dst, src, type=P2P_INVITATION_RESP, dialog_token=p2p['dialog_token'])
attrs = p2p_attr_status()
msg['payload'] += ie_p2p(attrs)
- mgmt_tx(dev[1], "MGMT_TX {} {} freq={} wait_time=200 no_cck=1 action={}".format(addr0, addr0, rx_msg['freq'], binascii.hexlify(msg['payload'])))
+ mgmt_tx(dev[1], "MGMT_TX {} {} freq={} wait_time=200 no_cck=1 action={}".format(
+ addr0, addr0, rx_msg['freq'], binascii.hexlify(msg['payload']).decode()))
ev = dev[0].wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
if ev is None:
dialog_token=p2p['dialog_token'])
attrs = p2p_attr_status(status=P2P_SC_FAIL_NO_COMMON_CHANNELS)
msg['payload'] += ie_p2p(attrs)
- mgmt_tx(dev[1], "MGMT_TX {} {} freq={} wait_time=200 no_cck=1 action={}".format(addr0, addr0, rx_msg['freq'], binascii.hexlify(msg['payload'])))
+ mgmt_tx(dev[1], "MGMT_TX {} {} freq={} wait_time=200 no_cck=1 action={}".format(
+ addr0, addr0, rx_msg['freq'], binascii.hexlify(msg['payload']).decode()))
ev = dev[0].wait_global_event(["P2P-INVITATION-RESULT"], timeout=15)
if ev is None:
raise Exception("Timeout on invitation result")
msg['payload'] += ie_p2p(attrs)
if "FAIL" in dev[1].request("SET ext_mgmt_frame_handling 0"):
raise Exception("Failed to disable external management frame handling")
- mgmt_tx(dev[1], "MGMT_TX {} {} freq={} wait_time=200 no_cck=1 action={}".format(addr0, addr0, rx_msg['freq'], binascii.hexlify(msg['payload'])))
+ mgmt_tx(dev[1], "MGMT_TX {} {} freq={} wait_time=200 no_cck=1 action={}".format(
+ addr0, addr0, rx_msg['freq'], binascii.hexlify(msg['payload']).decode()))
ev = dev[0].wait_global_event(["P2P-INVITATION-RESULT"], timeout=15)
if ev is None:
raise Exception("Timeout on invitation result")
dialog_token=p2p['dialog_token'])
attrs = p2p_attr_status(status=P2P_SC_FAIL_NO_COMMON_CHANNELS)
msg['payload'] += ie_p2p(attrs)
- mgmt_tx(dev[1], "MGMT_TX {} {} freq={} wait_time=200 no_cck=1 action={}".format(addr0, addr0, rx_msg['freq'], binascii.hexlify(msg['payload'])))
+ mgmt_tx(dev[1], "MGMT_TX {} {} freq={} wait_time=200 no_cck=1 action={}".format(
+ addr0, addr0, rx_msg['freq'], binascii.hexlify(msg['payload']).decode()))
rx_msg = dev[1].mgmt_rx()
if rx_msg is None:
raise Exception("Unexpected subtype %d" % p2p['subtype'])
logger.info("Retransmit duplicate of previous response")
- mgmt_tx(dev[1], "MGMT_TX {} {} freq={} wait_time=200 no_cck=1 action={}".format(addr0, addr0, rx_msg['freq'], binascii.hexlify(msg['payload'])))
+ mgmt_tx(dev[1], "MGMT_TX {} {} freq={} wait_time=200 no_cck=1 action={}".format(
+ addr0, addr0, rx_msg['freq'], binascii.hexlify(msg['payload']).decode()))
logger.info("Transmit real response")
msg = p2p_hdr(addr0, addr1, type=P2P_INVITATION_RESP,
attrs = p2p_attr_status(status=P2P_SC_SUCCESS)
attrs += p2p_attr_channel_list()
msg['payload'] += ie_p2p(attrs)
- if "FAIL" in dev[1].request("MGMT_TX {} {} freq={} wait_time=200 no_cck=1 action={}".format(addr0, addr0, rx_msg['freq'], binascii.hexlify(msg['payload']))):
+ if "FAIL" in dev[1].request("MGMT_TX {} {} freq={} wait_time=200 no_cck=1 action={}".format(
+ addr0, addr0, rx_msg['freq'], binascii.hexlify(msg['payload']).decode())):
raise Exception("Failed to transmit real response")
dev[1].request("SET ext_mgmt_frame_handling 0")
attrs += p2p_attr_operating_channel()
msg['payload'] += ie_p2p(attrs)
- mgmt_tx(dev[0], "MGMT_TX {} {} freq={} wait_time=10 no_cck=1 action={}".format(addr1, addr1, peer['listen_freq'], binascii.hexlify(msg['payload'])))
+ mgmt_tx(dev[0], "MGMT_TX {} {} freq={} wait_time=10 no_cck=1 action={}".format(
+ addr1, addr1, peer['listen_freq'], binascii.hexlify(msg['payload']).decode()))
ev = dev[1].wait_global_event(["P2P-GO-NEG-FAILURE"], timeout=5)
if ev is None:
attrs += p2p_attr_device_info(addr0, config_methods=0x0108)
attrs += p2p_attr_operating_channel()
msg['payload'] += ie_p2p(attrs)
- mgmt_tx(dev[0], "MGMT_TX {} {} freq={} wait_time=10 no_cck=1 action={}".format(addr1, addr1, peer['listen_freq'], binascii.hexlify(msg['payload'])))
+ mgmt_tx(dev[0], "MGMT_TX {} {} freq={} wait_time=10 no_cck=1 action={}".format(
+ addr1, addr1, peer['listen_freq'], binascii.hexlify(msg['payload']).decode()))
dev[0].p2p_listen()
dev[1].discover_peer(addr0)
if ev is None:
raise Exception("Timeout on GO Neg Req")
dev[0].p2p_stop_find()
- mgmt_tx(dev[0], "MGMT_TX {} {} freq={} wait_time=10 no_cck=1 action={}".format(addr1, addr1, peer['listen_freq'], binascii.hexlify(msg['payload'])))
+ mgmt_tx(dev[0], "MGMT_TX {} {} freq={} wait_time=10 no_cck=1 action={}".format(
+ addr1, addr1, peer['listen_freq'], binascii.hexlify(msg['payload']).decode()))
dev[0].dump_monitor()
dev[1].dump_monitor()
msg = p2p_hdr(addr1, addr0, type=P2P_GO_NEG_RESP, dialog_token=197)
attrs = struct.pack("<BB", P2P_ATTR_CAPABILITY, 0)
msg['payload'] += ie_p2p(attrs)
- mgmt_tx(dev[0], "MGMT_TX {} {} freq={} wait_time=10 no_cck=1 action={}".format(addr1, addr1, peer['listen_freq'], binascii.hexlify(msg['payload'])))
+ mgmt_tx(dev[0], "MGMT_TX {} {} freq={} wait_time=10 no_cck=1 action={}".format(
+ addr1, addr1, peer['listen_freq'], binascii.hexlify(msg['payload']).decode()))
frame = dev[0].mgmt_rx(timeout=0.1)
if frame is not None:
raise Exception("Unexpected GO Neg Confirm")
attrs += p2p_attr_device_info(addr0, config_methods=0x0108)
attrs += p2p_attr_operating_channel()
msg['payload'] += ie_p2p(attrs)
- mgmt_tx(dev[0], "MGMT_TX {} {} freq={} wait_time=200 no_cck=1 action={}".format(addr1, addr1, p2p['freq'], binascii.hexlify(msg['payload'])))
+ mgmt_tx(dev[0], "MGMT_TX {} {} freq={} wait_time=200 no_cck=1 action={}".format(
+ addr1, addr1, p2p['freq'], binascii.hexlify(msg['payload']).decode()))
dev[0].dump_monitor()
dev[1].dump_monitor()
attrs += p2p_attr_device_info(addr0, config_methods=0x0108)
attrs += p2p_attr_operating_channel()
msg['payload'] += ie_p2p(attrs)
- mgmt_tx(dev[0], "MGMT_TX {} {} freq={} wait_time=200 no_cck=1 action={}".format(addr1, addr1, p2p['freq'], binascii.hexlify(msg['payload'])))
+ mgmt_tx(dev[0], "MGMT_TX {} {} freq={} wait_time=200 no_cck=1 action={}".format(
+ addr1, addr1, p2p['freq'], binascii.hexlify(msg['payload']).decode()))
check_p2p_go_neg_fail_event(dev[1], P2P_SC_FAIL_INVALID_PARAMS)
rx_go_neg_conf(dev[0], P2P_SC_FAIL_INVALID_PARAMS, dialog_token)
dev[0].dump_monitor()
#attrs += p2p_attr_device_info(addr0, config_methods=0x0108)
attrs += p2p_attr_operating_channel()
msg['payload'] += ie_p2p(attrs)
- mgmt_tx(dev[0], "MGMT_TX {} {} freq={} wait_time=200 no_cck=1 action={}".format(addr1, addr1, p2p['freq'], binascii.hexlify(msg['payload'])))
+ mgmt_tx(dev[0], "MGMT_TX {} {} freq={} wait_time=200 no_cck=1 action={}".format(
+ addr1, addr1, p2p['freq'], binascii.hexlify(msg['payload']).decode()))
check_p2p_go_neg_fail_event(dev[1], P2P_SC_FAIL_INVALID_PARAMS)
rx_go_neg_conf(dev[0], P2P_SC_FAIL_INVALID_PARAMS, dialog_token)
dev[0].dump_monitor()
attrs += p2p_attr_device_info(addr0, config_methods=0x0108)
attrs += p2p_attr_operating_channel()
msg['payload'] += ie_p2p(attrs)
- mgmt_tx(dev[0], "MGMT_TX {} {} freq={} wait_time=200 no_cck=1 action={}".format(addr1, addr1, p2p['freq'], binascii.hexlify(msg['payload'])))
+ mgmt_tx(dev[0], "MGMT_TX {} {} freq={} wait_time=200 no_cck=1 action={}".format(
+ addr1, addr1, p2p['freq'], binascii.hexlify(msg['payload']).decode()))
check_p2p_go_neg_fail_event(dev[1], P2P_SC_FAIL_INVALID_PARAMS)
rx_go_neg_conf(dev[0], P2P_SC_FAIL_INVALID_PARAMS, dialog_token)
dev[0].dump_monitor()
attrs += p2p_attr_device_info(addr0, config_methods=0x0108)
attrs += p2p_attr_operating_channel()
msg['payload'] += ie_p2p(attrs)
- mgmt_tx(dev[0], "MGMT_TX {} {} freq={} wait_time=200 no_cck=1 action={}".format(addr1, addr1, p2p['freq'], binascii.hexlify(msg['payload'])))
+ mgmt_tx(dev[0], "MGMT_TX {} {} freq={} wait_time=200 no_cck=1 action={}".format(
+ addr1, addr1, p2p['freq'], binascii.hexlify(msg['payload']).decode()))
check_p2p_go_neg_fail_event(dev[1], P2P_SC_FAIL_INVALID_PARAMS)
rx_go_neg_conf(dev[0], P2P_SC_FAIL_INVALID_PARAMS, dialog_token)
dev[0].dump_monitor()
attrs += p2p_attr_device_info(addr0, config_methods=0x0108)
attrs += p2p_attr_operating_channel()
msg['payload'] += ie_p2p(attrs)
- mgmt_tx(dev[0], "MGMT_TX {} {} freq={} wait_time=200 no_cck=1 action={}".format(addr1, addr1, p2p['freq'], binascii.hexlify(msg['payload'])))
+ mgmt_tx(dev[0], "MGMT_TX {} {} freq={} wait_time=200 no_cck=1 action={}".format(
+ addr1, addr1, p2p['freq'], binascii.hexlify(msg['payload']).decode()))
check_p2p_go_neg_fail_event(dev[1], P2P_SC_FAIL_INCOMPATIBLE_PARAMS)
rx_go_neg_conf(dev[0], P2P_SC_FAIL_INCOMPATIBLE_PARAMS, dialog_token)
dev[0].dump_monitor()
attrs += p2p_attr_operating_channel()
#attrs += p2p_attr_group_id(src, "DIRECT-foo")
msg['payload'] += ie_p2p(attrs)
- mgmt_tx(dev[0], "MGMT_TX {} {} freq={} wait_time=200 no_cck=1 action={}".format(addr1, addr1, p2p['freq'], binascii.hexlify(msg['payload'])))
+ mgmt_tx(dev[0], "MGMT_TX {} {} freq={} wait_time=200 no_cck=1 action={}".format(
+ addr1, addr1, p2p['freq'], binascii.hexlify(msg['payload']).decode()))
check_p2p_go_neg_fail_event(dev[1], P2P_SC_FAIL_INVALID_PARAMS)
rx_go_neg_conf(dev[0], P2P_SC_FAIL_INVALID_PARAMS, dialog_token)
dev[0].dump_monitor()
#attrs += p2p_attr_operating_channel()
attrs += p2p_attr_group_id(addr0, "DIRECT-foo")
msg['payload'] += ie_p2p(attrs)
- mgmt_tx(dev[0], "MGMT_TX {} {} freq={} wait_time=200 no_cck=1 action={}".format(addr1, addr1, p2p['freq'], binascii.hexlify(msg['payload'])))
+ mgmt_tx(dev[0], "MGMT_TX {} {} freq={} wait_time=200 no_cck=1 action={}".format(
+ addr1, addr1, p2p['freq'], binascii.hexlify(msg['payload']).decode()))
check_p2p_go_neg_fail_event(dev[1], P2P_SC_FAIL_INVALID_PARAMS)
rx_go_neg_conf(dev[0], P2P_SC_FAIL_INVALID_PARAMS, dialog_token)
dev[0].dump_monitor()
attrs += p2p_attr_operating_channel()
attrs += p2p_attr_group_id(addr0, "DIRECT-foo")
msg['payload'] += ie_p2p(attrs)
- mgmt_tx(dev[0], "MGMT_TX {} {} freq={} wait_time=200 no_cck=1 action={}".format(addr1, addr1, p2p['freq'], binascii.hexlify(msg['payload'])))
+ mgmt_tx(dev[0], "MGMT_TX {} {} freq={} wait_time=200 no_cck=1 action={}".format(
+ addr1, addr1, p2p['freq'], binascii.hexlify(msg['payload']).decode()))
check_p2p_go_neg_fail_event(dev[1], P2P_SC_FAIL_INVALID_PARAMS)
rx_go_neg_conf(dev[0], P2P_SC_FAIL_INVALID_PARAMS, dialog_token)
dev[0].dump_monitor()
attrs += p2p_attr_operating_channel()
attrs += p2p_attr_group_id(addr0, "DIRECT-foo")
msg['payload'] += ie_p2p(attrs)
- mgmt_tx(dev[0], "MGMT_TX {} {} freq={} wait_time=200 no_cck=1 action={}".format(addr1, addr1, p2p['freq'], binascii.hexlify(msg['payload'])))
+ mgmt_tx(dev[0], "MGMT_TX {} {} freq={} wait_time=200 no_cck=1 action={}".format(
+ addr1, addr1, p2p['freq'], binascii.hexlify(msg['payload']).decode()))
check_p2p_go_neg_fail_event(dev[1], P2P_SC_FAIL_NO_COMMON_CHANNELS)
rx_go_neg_conf(dev[0], P2P_SC_FAIL_NO_COMMON_CHANNELS, dialog_token)
dev[0].dump_monitor()
pkt = build_dhcp_ack(dst_ll="ff:ff:ff:ff:ff:ff", src_ll=bssid,
ip_src="192.168.1.1", ip_dst="255.255.255.255",
yiaddr="192.168.1.123", chaddr=addr0)
- if "OK" not in hapd.request("DATA_TEST_FRAME ifname=ap-br0 " + binascii.hexlify(pkt)):
+ if "OK" not in hapd.request("DATA_TEST_FRAME ifname=ap-br0 " + binascii.hexlify(pkt).decode()):
raise Exception("DATA_TEST_FRAME failed")
dev[0].request("DISCONNECT")
def __str__(self):
txt = "opclass={} channel={} start={} duration={} frame_info={} rcpi={} rsni={} bssid={} antenna_id={} parent_tsf={}".format(self.opclass, self.channel, self.start, self.duration, self.frame_info, self.rcpi, self.rsni, self.bssid_str, self.antenna_id, self.parent_tsf)
if self.frame_body:
- txt += " frame_body=" + binascii.hexlify(self.frame_body)
+ txt += " frame_body=" + binascii.hexlify(self.frame_body).decode()
if self.frame_body_fragment_id:
- txt += " fragment_id=" + binascii.hexlify(self.frame_body_fragment_id)
+ txt += " fragment_id=" + binascii.hexlify(self.frame_body_fragment_id).decode()
if self.last_indication:
- txt += " last_indication=" + binascii.hexlify(self.last_indication)
+ txt += " last_indication=" + binascii.hexlify(self.last_indication).decode()
return txt
if not report.last_indication:
raise Exception("Last Beacon Report Indication subelement missing")
- last = binascii.hexlify(report.last_indication)
+ last = binascii.hexlify(report.last_indication).decode()
if last != '01':
raise Exception("last beacon report indication is not set on last frame")
ACTION_CATEG_WNM, WNM_ACT_SLEEP_MODE_REQ, 0,
WLAN_EID_WNMSLEEP, 4, WNM_SLEEP_MODE_EXIT, 0, 0,
WLAN_EID_TFS_REQ, 0)
- mgmt_tx(dev[0], "MGMT_TX {} {} freq=2412 wait_time=200 no_cck=1 action={}".format(msg['da'], msg['bssid'], binascii.hexlify(msg['payload'])))
+ mgmt_tx(dev[0], "MGMT_TX {} {} freq=2412 wait_time=200 no_cck=1 action={}".format(
+ msg['da'], msg['bssid'], binascii.hexlify(msg['payload']).decode()))
ev = hapd.wait_event([ "OCV failed" ], timeout=5)
if ev is None:
raise Exception("AP did not report missing OCI element")
oci_ie = struct.pack("<BBB", 81, 2, 0)
msg['payload'] += struct.pack("<BBB", WLAN_EID_EXTENSION, 1 + len(oci_ie),
WLAN_EID_EXT_OCV_OCI) + oci_ie
- mgmt_tx(dev[0], "MGMT_TX {} {} freq=2412 wait_time=200 no_cck=1 action={}".format(msg['da'], msg['bssid'] , binascii.hexlify(msg['payload'])))
+ mgmt_tx(dev[0], "MGMT_TX {} {} freq=2412 wait_time=200 no_cck=1 action={}".format(
+ msg['da'], msg['bssid'] , binascii.hexlify(msg['payload']).decode()))
ev = hapd.wait_event([ "OCV failed" ], timeout=5)
if ev is None:
raise Exception("AP did not report bad OCI element")
vals = ev.split(' ')
if vals[2] != '1' or vals[3] != '5':
raise Exception("Unexpected request values: " + ev)
- dev[0].set("coloc_intf_elems", binascii.hexlify(no_intf))
+ dev[0].set("coloc_intf_elems", binascii.hexlify(no_intf).decode())
ev = hapd.wait_event(["COLOC-INTF-REPORT"], timeout=1)
if ev is None:
raise Exception("No Collocated Interference Report frame seen")
- if addr + " 1 " + binascii.hexlify(no_intf) not in ev:
+ if addr + " 1 " + binascii.hexlify(no_intf).decode() not in ev:
raise Exception("Unexpected report values: " + ev)
if "OK" not in hapd.request("COLOC_INTF_REQ %s 0 0" % addr):
if vals[2] != '0' or vals[3] != '0':
raise Exception("Unexpected request values: " + ev)
- res = dev[0].request("COLOC_INTF_REPORT " + binascii.hexlify(no_intf))
+ res = dev[0].request("COLOC_INTF_REPORT " + binascii.hexlify(no_intf).decode())
if "OK" not in res:
raise Exception("Could not send unsolicited report")
ev = hapd.wait_event(["COLOC-INTF-REPORT"], timeout=1)
if ev is None:
raise Exception("No Collocated Interference Report frame seen")
- if addr + " 0 " + binascii.hexlify(no_intf) not in ev:
+ if addr + " 0 " + binascii.hexlify(no_intf).decode() not in ev:
raise Exception("Unexpected report values: " + ev)
finally:
dev[0].set("coloc_intf_reporting", "0")