# This software may be distributed under the terms of the BSD license.
# See README for more details.
+import binascii
+import struct
import time
import subprocess
import logging
logger = logging.getLogger()
import os
import os.path
+import socket
import subprocess
import hostapd
dev[0].hs20_enable()
subprocess.call(['brctl', 'setfd', 'ap-br0', '0'])
subprocess.call(['ip', 'link', 'set', 'dev', 'ap-br0', 'up'])
- subprocess.call(['ip', 'addr', 'add', 'aaaa:bbbb:cccc::1/64',
- 'dev', dev[0].ifname])
- subprocess.call(['ip', 'addr', 'add', 'aaaa:bbbb:dddd::1/64',
- 'dev', dev[1].ifname])
- subprocess.call(['ip', 'addr', 'add', 'aaaa:bbbb:eeee::1/64',
- 'dev', dev[1].ifname])
id = dev[0].add_cred_values({ 'realm': "example.com",
'username': "hs20-test",
scan_freq="2412")
time.sleep(0.1)
- subprocess.call(['ping6', 'aaaa:bbbb:cccc::2', '-c', '1', '-w', '1'],
- stdout=open('/dev/null', 'w'))
+ addr0 = dev[0].p2p_interface_addr()
+ addr1 = dev[1].p2p_interface_addr()
+
+ src_ll_opt0 = "\x01\x01" + binascii.unhexlify(addr0.replace(':',''))
+ src_ll_opt1 = "\x01\x01" + binascii.unhexlify(addr1.replace(':',''))
+
+ pkt = build_ns(src_ll=addr0, ip_src="aaaa:bbbb:cccc::2",
+ ip_dst="ff02::1:ff00:2", target="aaaa:bbbb:cccc::2",
+ opt=src_ll_opt0)
+ if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)):
+ raise Exception("DATA_TEST_FRAME failed")
+
+ pkt = build_ns(src_ll=addr1, ip_src="aaaa:bbbb:dddd::2",
+ ip_dst="ff02::1:ff00:2", target="aaaa:bbbb:dddd::2",
+ opt=src_ll_opt1)
+ if "OK" not in dev[1].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)):
+ raise Exception("DATA_TEST_FRAME failed")
+
+ pkt = build_ns(src_ll=addr1, ip_src="aaaa:bbbb:eeee::2",
+ ip_dst="ff02::1:ff00:2", target="aaaa:bbbb:eeee::2",
+ opt=src_ll_opt1)
+ if "OK" not in dev[1].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)):
+ raise Exception("DATA_TEST_FRAME failed")
+
matches = get_permanent_neighbors("ap-br0")
logger.info("After connect: " + str(matches))
- if len(matches) < 1:
- raise Exception("No neighbor entries after connect")
+ if len(matches) != 3:
+ raise Exception("Unexpected number of neighbor entries after connect")
+ if 'aaaa:bbbb:cccc::2 dev ap-br0 lladdr 02:00:00:00:00:00 PERMANENT' not in matches:
+ raise Exception("dev0 addr missing")
+ if 'aaaa:bbbb:dddd::2 dev ap-br0 lladdr 02:00:00:00:01:00 PERMANENT' not in matches:
+ raise Exception("dev1 addr(1) missing")
+ if 'aaaa:bbbb:eeee::2 dev ap-br0 lladdr 02:00:00:00:01:00 PERMANENT' not in matches:
+ raise Exception("dev1 addr(2) missing")
dev[0].request("DISCONNECT")
dev[1].request("DISCONNECT")
time.sleep(0.5)
stderr=open('/dev/null', 'w'))
subprocess.call(['brctl', 'delbr', 'ap-br0'],
stderr=open('/dev/null', 'w'))
- subprocess.call(['ip', 'addr', 'del', 'aaaa:bbbb:cccc::1/64',
- 'dev', dev[0].ifname])
- subprocess.call(['ip', 'addr', 'del', 'aaaa:bbbb:dddd::1/64',
- 'dev', dev[1].ifname])
- subprocess.call(['ip', 'addr', 'del', 'aaaa:bbbb:eeee::1/64',
- 'dev', dev[1].ifname])
return res
+def ip_checksum(buf):
+ sum = 0
+ if len(buf) & 0x01:
+ buf += '\0x00'
+ for i in range(0, len(buf), 2):
+ val, = struct.unpack('H', buf[i:i+2])
+ sum += val
+ while (sum >> 16):
+ sum = (sum & 0xffff) + (sum >> 16)
+ return struct.pack('H', ~sum & 0xffff)
+
+def build_icmpv6(ipv6_addrs, type, code, payload):
+ start = struct.pack("BB", type, code)
+ end = payload
+ icmp = start + '\x00\x00' + end
+ pseudo = ipv6_addrs + struct.pack(">LBBBB", len(icmp), 0, 0, 0, 58)
+ csum = ip_checksum(pseudo + icmp)
+ return start + csum + end
+
+def build_ns(src_ll, ip_src, ip_dst, target, opt=None):
+ link_mc = binascii.unhexlify("3333ff000002")
+ _src_ll = binascii.unhexlify(src_ll.replace(':',''))
+ proto = '\x86\xdd'
+ ehdr = link_mc + _src_ll + proto
+ _ip_src = socket.inet_pton(socket.AF_INET6, ip_src)
+ _ip_dst = socket.inet_pton(socket.AF_INET6, ip_dst)
+
+ reserved = '\x00\x00\x00\x00'
+ _target = socket.inet_pton(socket.AF_INET6, target)
+ if opt:
+ payload = reserved + _target + opt
+ else:
+ payload = reserved + _target
+ icmp = build_icmpv6(_ip_src + _ip_dst, 135, 0, payload)
+
+ ipv6 = struct.pack('>BBBBHBB', 0x60, 0, 0, 0, len(icmp), 58, 255)
+ ipv6 += _ip_src + _ip_dst
+
+ return ehdr + ipv6 + icmp
+
def get_permanent_neighbors(ifname):
cmd = subprocess.Popen(['ip', 'nei'], stdout=subprocess.PIPE)
res = cmd.stdout.read()
subprocess.call(['brctl', 'setfd', 'ap-br0', '0'])
subprocess.call(['ip', 'link', 'set', 'dev', 'ap-br0', 'up'])
- subprocess.call(['ip', 'addr', 'add', 'aaaa:bbbb:cccc::1/64',
- 'dev', dev[0].ifname])
- subprocess.call(['ip', 'addr', 'add', 'aaaa:bbbb:dddd::1/64',
- 'dev', dev[1].ifname])
- subprocess.call(['ip', 'addr', 'add', 'aaaa:bbbb:eeee::1/64',
- 'dev', dev[1].ifname])
dev[0].connect("open", key_mgmt="NONE", scan_freq="2412")
dev[1].connect("open", key_mgmt="NONE", scan_freq="2412")
time.sleep(0.1)
- subprocess.call(['ping6', 'aaaa:bbbb:cccc::2', '-c', '1', '-w', '1'],
- stdout=open('/dev/null', 'w'))
- subprocess.call(['ping6', 'aaaa:bbbb:dddd::2', '-c', '1', '-w', '1'],
- stdout=open('/dev/null', 'w'))
- subprocess.call(['ping6', 'aaaa:bbbb:eeee::2', '-c', '1', '-w', '1'],
- stdout=open('/dev/null', 'w'))
+ addr0 = dev[0].p2p_interface_addr()
+ addr1 = dev[1].p2p_interface_addr()
+
+ src_ll_opt0 = "\x01\x01" + binascii.unhexlify(addr0.replace(':',''))
+ src_ll_opt1 = "\x01\x01" + binascii.unhexlify(addr1.replace(':',''))
+
+ # DAD NS
+ pkt = build_ns(src_ll=addr0, ip_src="::", ip_dst="ff02::1:ff00:2",
+ target="aaaa:bbbb:cccc::2", opt=src_ll_opt0)
+ if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)):
+ raise Exception("DATA_TEST_FRAME failed")
+
+ pkt = build_ns(src_ll=addr0, ip_src="aaaa:bbbb:cccc::2",
+ ip_dst="ff02::1:ff00:2", target="aaaa:bbbb:cccc::2",
+ opt=src_ll_opt0)
+ if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)):
+ raise Exception("DATA_TEST_FRAME failed")
+ # test frame without source link-layer address option
+ pkt = build_ns(src_ll=addr0, ip_src="aaaa:bbbb:cccc::2",
+ ip_dst="ff02::1:ff00:2", target="aaaa:bbbb:cccc::2")
+ if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)):
+ raise Exception("DATA_TEST_FRAME failed")
+ # test frame with bogus option
+ pkt = build_ns(src_ll=addr0, ip_src="aaaa:bbbb:cccc::2",
+ ip_dst="ff02::1:ff00:2", target="aaaa:bbbb:cccc::2",
+ opt="\x70\x01\x01\x02\x03\x04\x05\x05")
+ if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)):
+ raise Exception("DATA_TEST_FRAME failed")
+ # test frame with truncated source link-layer address option
+ pkt = build_ns(src_ll=addr0, ip_src="aaaa:bbbb:cccc::2",
+ ip_dst="ff02::1:ff00:2", target="aaaa:bbbb:cccc::2",
+ opt="\x01\x01\x01\x02\x03\x04")
+ if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)):
+ raise Exception("DATA_TEST_FRAME failed")
+ # test frame with foreign source link-layer address option
+ pkt = build_ns(src_ll=addr0, ip_src="aaaa:bbbb:cccc::2",
+ ip_dst="ff02::1:ff00:2", target="aaaa:bbbb:cccc::2",
+ opt="\x01\x01\x01\x02\x03\x04\x05\x06")
+ if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)):
+ raise Exception("DATA_TEST_FRAME failed")
+
+ pkt = build_ns(src_ll=addr1, ip_src="aaaa:bbbb:dddd::2",
+ ip_dst="ff02::1:ff00:2", target="aaaa:bbbb:dddd::2",
+ opt=src_ll_opt1)
+ if "OK" not in dev[1].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)):
+ raise Exception("DATA_TEST_FRAME failed")
+
+ pkt = build_ns(src_ll=addr1, ip_src="aaaa:bbbb:eeee::2",
+ ip_dst="ff02::1:ff00:2", target="aaaa:bbbb:eeee::2",
+ opt=src_ll_opt1)
+ if "OK" not in dev[1].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)):
+ raise Exception("DATA_TEST_FRAME failed")
+ # another copy for additional code coverage
+ if "OK" not in dev[1].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)):
+ raise Exception("DATA_TEST_FRAME failed")
+
matches = get_permanent_neighbors("ap-br0")
logger.info("After connect: " + str(matches))
if len(matches) != 3:
raise Exception("Unexpected number of neighbor entries after connect")
+ if 'aaaa:bbbb:cccc::2 dev ap-br0 lladdr 02:00:00:00:00:00 PERMANENT' not in matches:
+ raise Exception("dev0 addr missing")
+ if 'aaaa:bbbb:dddd::2 dev ap-br0 lladdr 02:00:00:00:01:00 PERMANENT' not in matches:
+ raise Exception("dev1 addr(1) missing")
+ if 'aaaa:bbbb:eeee::2 dev ap-br0 lladdr 02:00:00:00:01:00 PERMANENT' not in matches:
+ raise Exception("dev1 addr(2) missing")
+
dev[0].request("DISCONNECT")
dev[1].request("DISCONNECT")
time.sleep(0.5)
stderr=open('/dev/null', 'w'))
subprocess.call(['brctl', 'delbr', 'ap-br0'],
stderr=open('/dev/null', 'w'))
- subprocess.call(['ip', 'addr', 'del', 'aaaa:bbbb:cccc::1/64',
- 'dev', dev[0].ifname])
- subprocess.call(['ip', 'addr', 'del', 'aaaa:bbbb:dddd::1/64',
- 'dev', dev[1].ifname])
- subprocess.call(['ip', 'addr', 'del', 'aaaa:bbbb:eeee::1/64',
- 'dev', dev[1].ifname])
return res