import subprocess
import hostapd
+from utils import HwsimSkip
+import hwsim_utils
from wlantest import Wlantest
from wpasupplicant import WpaSupplicant
def check_auto_select(dev, bssid):
dev.scan_for_bss(bssid, freq="2412")
dev.request("INTERWORKING_SELECT auto freq=2412")
- ev = dev.wait_event(["CTRL-EVENT-CONNECTED"], timeout=15)
- if ev is None:
- raise Exception("Connection timed out")
+ ev = dev.wait_connected(timeout=15)
if bssid not in ev:
raise Exception("Connected to incorrect network")
dev.request("REMOVE_NETWORK all")
def hlr_auc_gw_available():
if not os.path.exists("/tmp/hlr_auc_gw.sock"):
- logger.info("No hlr_auc_gw available");
- return False
+ raise HwsimSkip("No hlr_auc_gw socket available")
if not os.path.exists("../../hostapd/hlr_auc_gw"):
- logger.info("No hlr_auc_gw available");
- return False
- return True
+ raise HwsimSkip("No hlr_auc_gw available")
def interworking_ext_sim_connect(dev, bssid, method):
dev.request("INTERWORKING_CONNECT " + bssid)
resp = res.split(' ')[2].rstrip()
dev.request("CTRL-RSP-SIM-" + id + ":GSM-AUTH:" + resp)
- ev = dev.wait_event(["CTRL-EVENT-CONNECTED"], timeout=15)
- if ev is None:
- raise Exception("Connection timed out")
+ dev.wait_connected(timeout=15)
def interworking_connect(dev, bssid, method):
dev.request("INTERWORKING_CONNECT " + bssid)
if "(" + method + ")" not in ev:
raise Exception("Unexpected EAP method selection")
- ev = dev.wait_event(["CTRL-EVENT-CONNECTED"], timeout=15)
- if ev is None:
- raise Exception("Connection timed out")
+ dev.wait_connected(timeout=15)
def check_probe_resp(wt, bssid_unexpected, bssid_expected):
if bssid_unexpected:
def test_ap_interworking_scan_filtering(dev, apdev):
"""Interworking scan filtering with HESSID and access network type"""
try:
- return _test_ap_interworking_scan_filtering(dev, apdev)
+ _test_ap_interworking_scan_filtering(dev, apdev)
finally:
dev[0].request("SET hessid 00:00:00:00:00:00")
dev[0].request("SET access_network_type 15")
def test_ap_hs20_sim(dev, apdev):
"""Hotspot 2.0 with simulated SIM and EAP-SIM"""
- if not hlr_auc_gw_available():
- return "skip"
+ hlr_auc_gw_available()
hs20_simulated_sim(dev[0], apdev[0], "SIM")
dev[0].request("INTERWORKING_SELECT auto freq=2412")
ev = dev[0].wait_event(["INTERWORKING-ALREADY-CONNECTED"], timeout=15)
def test_ap_hs20_aka(dev, apdev):
"""Hotspot 2.0 with simulated USIM and EAP-AKA"""
- if not hlr_auc_gw_available():
- return "skip"
+ hlr_auc_gw_available()
hs20_simulated_sim(dev[0], apdev[0], "AKA")
def test_ap_hs20_aka_prime(dev, apdev):
"""Hotspot 2.0 with simulated USIM and EAP-AKA'"""
- if not hlr_auc_gw_available():
- return "skip"
+ hlr_auc_gw_available()
hs20_simulated_sim(dev[0], apdev[0], "AKA'")
def test_ap_hs20_ext_sim(dev, apdev):
"""Hotspot 2.0 with external SIM processing"""
- if not hlr_auc_gw_available():
- return "skip"
+ hlr_auc_gw_available()
bssid = apdev[0]['bssid']
params = hs20_ap_params()
params['hessid'] = bssid
def test_ap_hs20_ext_sim_roaming(dev, apdev):
"""Hotspot 2.0 with external SIM processing in roaming network"""
- if not hlr_auc_gw_available():
- return "skip"
+ hlr_auc_gw_available()
bssid = apdev[0]['bssid']
params = hs20_ap_params()
params['hessid'] = bssid
'domain': "example.com",
'update_identifier': "1234" })
dev[0].request("REASSOCIATE")
- ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=15)
- if ev is None:
- raise Exception("Connection timed out")
+ dev[0].wait_connected(timeout=15)
check_sp_type(dev[0], "home")
status = dev[0].get_status()
if status['pairwise_cipher'] != "CCMP":
raise Exception("Selected incorrect BSS")
break
- ev = dev.wait_event(["CTRL-EVENT-CONNECTED"], timeout=15)
- if ev is None:
- raise Exception("Connection timed out")
+ ev = dev.wait_connected(timeout=15)
if bssid and bssid not in ev:
raise Exception("Connected to incorrect BSS")
def test_ap_hs20_multi_cred_sp_prio(dev, apdev):
"""Hotspot 2.0 multi-cred sp_priority"""
try:
- return _test_ap_hs20_multi_cred_sp_prio(dev, apdev)
+ _test_ap_hs20_multi_cred_sp_prio(dev, apdev)
finally:
dev[0].request("SET external_sim 0")
def _test_ap_hs20_multi_cred_sp_prio(dev, apdev):
- if not hlr_auc_gw_available():
- return "skip"
+ hlr_auc_gw_available()
bssid = apdev[0]['bssid']
params = hs20_ap_params()
params['hessid'] = bssid
def test_ap_hs20_multi_cred_sp_prio2(dev, apdev):
"""Hotspot 2.0 multi-cred sp_priority with two BSSes"""
try:
- return _test_ap_hs20_multi_cred_sp_prio2(dev, apdev)
+ _test_ap_hs20_multi_cred_sp_prio2(dev, apdev)
finally:
dev[0].request("SET external_sim 0")
def _test_ap_hs20_multi_cred_sp_prio2(dev, apdev):
- if not hlr_auc_gw_available():
- return "skip"
+ hlr_auc_gw_available()
bssid = apdev[0]['bssid']
params = hs20_ap_params()
params['hessid'] = bssid
if "1 120 http://example.com/" not in ev:
raise Exception("Unexpected deauth imminent notice: " + ev)
hapd.request("DEAUTHENTICATE " + addr)
- ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"])
- if ev is None:
- raise Exception("Timeout on disconnection")
+ dev[0].wait_disconnected(timeout=10)
if "[TEMP-DISABLED]" not in dev[0].list_networks()[0]['flags']:
raise Exception("Network not marked temporarily disabled")
ev = dev[0].wait_event(["SME: Trying to authenticate",
if "0 120 http://example.com/" not in ev:
raise Exception("Unexpected deauth imminent notice: " + ev)
hapd.request("DEAUTHENTICATE " + addr + " reason=4")
- ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"])
- if ev is None:
- raise Exception("Timeout on disconnection")
+ ev = dev[0].wait_disconnected(timeout=10)
if "reason=4" not in ev:
raise Exception("Unexpected disconnection reason")
if "[TEMP-DISABLED]" not in dev[0].list_networks()[0]['flags']:
raise Exception("Timeout on deauth imminent notice")
if " 1 100" not in ev:
raise Exception("Unexpected deauth imminent contents")
- ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=3)
- if ev is None:
- raise Exception("Timeout on disconnection")
+ dev[0].wait_disconnected(timeout=3)
def test_ap_hs20_remediation_required(dev, apdev):
"""Hotspot 2.0 connection and remediation required from RADIUS"""
dev[0].scan_for_bss(bssid, freq="2412")
dev[0].request("INTERWORKING_SELECT auto freq=2412")
- ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=15)
- if ev is None:
- raise Exception("Connection timed out")
+ ev = dev[0].wait_connected(timeout=15)
if bssid not in ev:
raise Exception("Unexpected network selected")
dev[0].scan_for_bss(bssid2, freq="2412")
dev[0].request("INTERWORKING_SELECT auto freq=2412")
- ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=15)
- if ev is None:
- raise Exception("Connection timed out")
+ ev = dev[0].wait_connected(timeout=15)
if bssid2 not in ev:
raise Exception("Unexpected network selected")
dev[0].scan_for_bss(bssid, freq="2412")
dev[0].scan_for_bss(bssid2, freq="2412")
dev[0].request("INTERWORKING_SELECT auto freq=2412")
- ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=15)
- if ev is None:
- raise Exception("Connection timed out")
+ ev = dev[0].wait_connected(timeout=15)
if bssid not in ev:
raise Exception("Unexpected network selected")
dev[0].scan_for_bss(bssid, freq="2412")
dev[0].scan_for_bss(bssid2, freq="2412")
dev[0].request("INTERWORKING_SELECT auto freq=2412")
- ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=15)
- if ev is None:
- raise Exception("Connection timed out")
+ ev = dev[0].wait_connected(timeout=15)
if bssid not in ev:
raise Exception("Unexpected network selected")
try:
import sqlite3
except ImportError:
- return "skip"
+ raise HwsimSkip("No sqlite3 module available")
dbfile = os.path.join(params['logdir'], "eap-user.db")
try:
os.remove(dbfile)
dev[0].add_network()
dev[0].request("DISCONNECT")
- ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"])
- if ev is None:
- raise Exception("Timeout on disconnection")
+ dev[0].wait_disconnected(timeout=10)
hapd.disable()
hapd.set("ssid", "another ssid")
dev[0].remove_cred(id)
if len(dev[0].list_networks()) != 3:
raise Exception("Unexpected number of networks after to remove_crec")
- ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"])
- if ev is None:
- raise Exception("Timeout on disconnection")
+ dev[0].wait_disconnected(timeout=10)
def _test_ap_hs20_proxyarp(dev, apdev):
bssid = apdev[0]['bssid']
hapd.enable()
except:
# For now, do not report failures due to missing kernel support
- logger.info("Could not start hostapd - assume proxyarp not supported in kernel version")
- return "skip"
+ raise HwsimSkip("Could not start hostapd - assume proxyarp not supported in kernel version")
ev = hapd.wait_event(["AP-ENABLED", "AP-DISABLED"], timeout=10)
if ev is None:
raise Exception("AP startup timed out")
def test_ap_hs20_proxyarp(dev, apdev):
"""Hotspot 2.0 and ProxyARP"""
- res = None
try:
- res = _test_ap_hs20_proxyarp(dev, apdev)
+ _test_ap_hs20_proxyarp(dev, apdev)
finally:
subprocess.call(['ip', 'link', 'set', 'dev', 'ap-br0', 'down'],
stderr=open('/dev/null', 'w'))
subprocess.call(['brctl', 'delbr', 'ap-br0'],
stderr=open('/dev/null', 'w'))
- return res
-
def _test_ap_hs20_proxyarp_dgaf(dev, apdev, disabled):
bssid = apdev[0]['bssid']
params = hs20_ap_params()
hapd.enable()
except:
# For now, do not report failures due to missing kernel support
- logger.info("Could not start hostapd - assume proxyarp not supported in kernel version")
- return "skip"
+ raise HwsimSkip("Could not start hostapd - assume proxyarp not supported in kernel version")
ev = hapd.wait_event(["AP-ENABLED"], timeout=10)
if ev is None:
raise Exception("AP startup timed out")
def test_ap_hs20_proxyarp_disable_dgaf(dev, apdev):
"""Hotspot 2.0 and ProxyARP with DGAF disabled"""
- res = None
try:
- res = _test_ap_hs20_proxyarp_dgaf(dev, apdev, True)
+ _test_ap_hs20_proxyarp_dgaf(dev, apdev, True)
finally:
subprocess.call(['ip', 'link', 'set', 'dev', 'ap-br0', 'down'],
stderr=open('/dev/null', 'w'))
subprocess.call(['brctl', 'delbr', 'ap-br0'],
stderr=open('/dev/null', 'w'))
- return res
-
def test_ap_hs20_proxyarp_enable_dgaf(dev, apdev):
"""Hotspot 2.0 and ProxyARP with DGAF enabled"""
- res = None
try:
- res = _test_ap_hs20_proxyarp_dgaf(dev, apdev, False)
+ _test_ap_hs20_proxyarp_dgaf(dev, apdev, False)
finally:
subprocess.call(['ip', 'link', 'set', 'dev', 'ap-br0', 'down'],
stderr=open('/dev/null', 'w'))
subprocess.call(['brctl', 'delbr', 'ap-br0'],
stderr=open('/dev/null', 'w'))
- return res
-
def ip_checksum(buf):
sum = 0
if len(buf) & 0x01:
sum = (sum & 0xffff) + (sum >> 16)
return struct.pack('H', ~sum & 0xffff)
+def ipv6_solicited_node_mcaddr(target):
+ prefix = socket.inet_pton(socket.AF_INET6, "ff02::1:ff00:0")
+ mask = socket.inet_pton(socket.AF_INET6, "::ff:ffff")
+ _target = socket.inet_pton(socket.AF_INET6, target)
+ p = struct.unpack('4I', prefix)
+ m = struct.unpack('4I', mask)
+ t = struct.unpack('4I', _target)
+ res = (p[0] | (t[0] & m[0]),
+ p[1] | (t[1] & m[1]),
+ p[2] | (t[2] & m[2]),
+ p[3] | (t[3] & m[3]))
+ return socket.inet_ntop(socket.AF_INET6, struct.pack('4I', *res))
+
def build_icmpv6(ipv6_addrs, type, code, payload):
start = struct.pack("BB", type, code)
end = payload
proto = '\x86\xdd'
ehdr = link_mc + _src_ll + proto
_ip_src = socket.inet_pton(socket.AF_INET6, ip_src)
+ if ip_dst is None:
+ ip_dst = ipv6_solicited_node_mcaddr(target)
_ip_dst = socket.inet_pton(socket.AF_INET6, ip_dst)
reserved = '\x00\x00\x00\x00'
return ehdr + ipv6 + icmp
+def send_ns(dev, src_ll=None, target=None, ip_src=None, ip_dst=None, opt=None,
+ hapd_bssid=None):
+ if hapd_bssid:
+ if src_ll is None:
+ src_ll = hapd_bssid
+ cmd = "DATA_TEST_FRAME ifname=ap-br0 "
+ else:
+ if src_ll is None:
+ src_ll = dev.p2p_interface_addr()
+ cmd = "DATA_TEST_FRAME "
+
+ if opt is None:
+ opt = "\x01\x01" + binascii.unhexlify(src_ll.replace(':',''))
+
+ pkt = build_ns(src_ll=src_ll, ip_src=ip_src, ip_dst=ip_dst, target=target,
+ opt=opt)
+ if "OK" not in dev.request(cmd + binascii.hexlify(pkt)):
+ raise Exception("DATA_TEST_FRAME failed")
+
def build_na(src_ll, ip_src, ip_dst, target, opt=None):
link_mc = binascii.unhexlify("3333ff000002")
_src_ll = binascii.unhexlify(src_ll.replace(':',''))
return ehdr + ipv6 + icmp
+def send_na(dev, src_ll=None, target=None, ip_src=None, ip_dst=None, opt=None,
+ hapd_bssid=None):
+ if hapd_bssid:
+ if src_ll is None:
+ src_ll = hapd_bssid
+ cmd = "DATA_TEST_FRAME ifname=ap-br0 "
+ else:
+ if src_ll is None:
+ src_ll = dev.p2p_interface_addr()
+ cmd = "DATA_TEST_FRAME "
+
+ pkt = build_na(src_ll=src_ll, ip_src=ip_src, ip_dst=ip_dst, target=target,
+ opt=opt)
+ if "OK" not in dev.request(cmd + binascii.hexlify(pkt)):
+ raise Exception("DATA_TEST_FRAME failed")
+
def build_dhcp_ack(dst_ll, src_ll, ip_src, ip_dst, yiaddr, chaddr,
subnet_mask="255.255.255.0", truncated_opt=False,
wrong_magic=False, force_tot_len=None, no_dhcp=False):
sender_mac = dev.p2p_interface_addr()
cmd = "DATA_TEST_FRAME "
- pkt = build_arp(dst_ll="ff:ff:ff:ff:ff:ff", src_ll=src_ll, opcode=opcode,
+ pkt = build_arp(dst_ll=dst_ll, src_ll=src_ll, opcode=opcode,
sender_mac=sender_mac, sender_ip=sender_ip,
target_mac=target_mac, target_ip=target_ip)
if "OK" not in dev.request(cmd + binascii.hexlify(pkt)):
hapd.enable()
except:
# For now, do not report failures due to missing kernel support
- logger.info("Could not start hostapd - assume proxyarp not supported in kernel version")
- return "skip"
+ raise HwsimSkip("Could not start hostapd - assume proxyarp not supported in kernel version")
ev = hapd.wait_event(["AP-ENABLED", "AP-DISABLED"], timeout=10)
if ev is None:
raise Exception("AP startup timed out")
subprocess.call(['brctl', 'setfd', 'ap-br0', '0'])
subprocess.call(['ip', 'link', 'set', 'dev', 'ap-br0', 'up'])
+ for chain in [ 'FORWARD', 'OUTPUT' ]:
+ subprocess.call(['ebtables', '-A', chain, '-p', 'ARP',
+ '-d', 'Broadcast', '-o', apdev[0]['ifname'],
+ '-j', 'DROP'])
+ subprocess.call(['ebtables', '-A', chain, '-d', 'Multicast',
+ '-p', 'IPv6', '--ip6-protocol', 'ipv6-icmp',
+ '--ip6-icmp-type', 'neighbor-solicitation',
+ '-o', apdev[0]['ifname'], '-j', 'DROP'])
+ subprocess.call(['ebtables', '-A', chain, '-d', 'Multicast',
+ '-p', 'IPv6', '--ip6-protocol', 'ipv6-icmp',
+ '--ip6-icmp-type', 'neighbor-advertisement',
+ '-o', apdev[0]['ifname'], '-j', 'DROP'])
+ subprocess.call(['ebtables', '-A', chain,
+ '-p', 'IPv6', '--ip6-protocol', 'ipv6-icmp',
+ '--ip6-icmp-type', 'router-solicitation',
+ '-o', apdev[0]['ifname'], '-j', 'DROP'])
+ # Multicast Listener Report Message
+ subprocess.call(['ebtables', '-A', chain, '-d', 'Multicast',
+ '-p', 'IPv6', '--ip6-protocol', 'ipv6-icmp',
+ '--ip6-icmp-type', '143',
+ '-o', apdev[0]['ifname'], '-j', 'DROP'])
+
cmd = {}
- cmd[0] = subprocess.Popen(['tcpdump', '-i', 'ap-br0', '-w', cap_br,
- '-s', '2000'], stderr=open('/dev/null', 'w'))
- cmd[1] = subprocess.Popen(['tcpdump', '-i', dev[0].ifname, '-w', cap_dev0,
- '-s', '2000'], stderr=open('/dev/null', 'w'))
- cmd[2] = subprocess.Popen(['tcpdump', '-i', dev[1].ifname, '-w', cap_dev1,
- '-s', '2000'], stderr=open('/dev/null', 'w'))
+ cmd[0] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'ap-br0',
+ '-w', cap_br, '-s', '2000'],
+ stderr=open('/dev/null', 'w'))
+ cmd[1] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', dev[0].ifname,
+ '-w', cap_dev0, '-s', '2000'],
+ stderr=open('/dev/null', 'w'))
+ cmd[2] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', dev[1].ifname,
+ '-w', cap_dev1, '-s', '2000'],
+ stderr=open('/dev/null', 'w'))
dev[0].connect("open", key_mgmt="NONE", scan_freq="2412")
dev[1].connect("open", key_mgmt="NONE", scan_freq="2412")
src_ll_opt1 = "\x01\x01" + binascii.unhexlify(addr1.replace(':',''))
# DAD NS
- pkt = build_ns(src_ll=addr0, ip_src="::", ip_dst="ff02::1:ff00:2",
- target="aaaa:bbbb:cccc::2", opt=src_ll_opt0)
- if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)):
- raise Exception("DATA_TEST_FRAME failed")
+ send_ns(dev[0], ip_src="::", target="aaaa:bbbb:cccc::2")
- pkt = build_ns(src_ll=addr0, ip_src="aaaa:bbbb:cccc::2",
- ip_dst="ff02::1:ff00:2", target="aaaa:bbbb:cccc::2",
- opt=src_ll_opt0)
- if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)):
- raise Exception("DATA_TEST_FRAME failed")
+ send_ns(dev[0], ip_src="aaaa:bbbb:cccc::2", target="aaaa:bbbb:cccc::2")
# test frame without source link-layer address option
- pkt = build_ns(src_ll=addr0, ip_src="aaaa:bbbb:cccc::2",
- ip_dst="ff02::1:ff00:2", target="aaaa:bbbb:cccc::2")
- if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)):
- raise Exception("DATA_TEST_FRAME failed")
+ send_ns(dev[0], ip_src="aaaa:bbbb:cccc::2", target="aaaa:bbbb:cccc::2",
+ opt='')
# test frame with bogus option
- pkt = build_ns(src_ll=addr0, ip_src="aaaa:bbbb:cccc::2",
- ip_dst="ff02::1:ff00:2", target="aaaa:bbbb:cccc::2",
- opt="\x70\x01\x01\x02\x03\x04\x05\x05")
- if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)):
- raise Exception("DATA_TEST_FRAME failed")
+ send_ns(dev[0], ip_src="aaaa:bbbb:cccc::2", target="aaaa:bbbb:cccc::2",
+ opt="\x70\x01\x01\x02\x03\x04\x05\x05")
# test frame with truncated source link-layer address option
- pkt = build_ns(src_ll=addr0, ip_src="aaaa:bbbb:cccc::2",
- ip_dst="ff02::1:ff00:2", target="aaaa:bbbb:cccc::2",
- opt="\x01\x01\x01\x02\x03\x04")
- if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)):
- raise Exception("DATA_TEST_FRAME failed")
+ send_ns(dev[0], ip_src="aaaa:bbbb:cccc::2", target="aaaa:bbbb:cccc::2",
+ opt="\x01\x01\x01\x02\x03\x04")
# test frame with foreign source link-layer address option
- pkt = build_ns(src_ll=addr0, ip_src="aaaa:bbbb:cccc::2",
- ip_dst="ff02::1:ff00:2", target="aaaa:bbbb:cccc::2",
- opt="\x01\x01\x01\x02\x03\x04\x05\x06")
- if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)):
- raise Exception("DATA_TEST_FRAME failed")
+ send_ns(dev[0], ip_src="aaaa:bbbb:cccc::2", target="aaaa:bbbb:cccc::2",
+ opt="\x01\x01\x01\x02\x03\x04\x05\x06")
- pkt = build_ns(src_ll=addr1, ip_src="aaaa:bbbb:dddd::2",
- ip_dst="ff02::1:ff00:2", target="aaaa:bbbb:dddd::2",
- opt=src_ll_opt1)
- if "OK" not in dev[1].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)):
- raise Exception("DATA_TEST_FRAME failed")
+ send_ns(dev[1], ip_src="aaaa:bbbb:dddd::2", target="aaaa:bbbb:dddd::2")
- pkt = build_ns(src_ll=addr1, ip_src="aaaa:bbbb:eeee::2",
- ip_dst="ff02::1:ff00:2", target="aaaa:bbbb:eeee::2",
- opt=src_ll_opt1)
- if "OK" not in dev[1].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)):
- raise Exception("DATA_TEST_FRAME failed")
+ send_ns(dev[1], ip_src="aaaa:bbbb:eeee::2", target="aaaa:bbbb:eeee::2")
# another copy for additional code coverage
- if "OK" not in dev[1].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)):
- raise Exception("DATA_TEST_FRAME failed")
+ send_ns(dev[1], ip_src="aaaa:bbbb:eeee::2", target="aaaa:bbbb:eeee::2")
pkt = build_dhcp_ack(dst_ll="ff:ff:ff:ff:ff:ff", src_ll=bssid,
ip_src="192.168.1.1", ip_dst="255.255.255.255",
target_ip="192.168.1.127")
# ARP Probe from bridge
- send_arp(hapd, hapd_bssid=bssid, target_ip="192.168.1.128")
- # ARP Announcement from bridge
- send_arp(hapd, hapd_bssid=bssid, sender_ip="129.168.1.128",
- target_ip="192.168.1.128")
- send_arp(hapd, hapd_bssid=bssid, sender_ip="129.168.1.128",
- target_ip="192.168.1.128", opcode=2)
+ send_arp(hapd, hapd_bssid=bssid, target_ip="192.168.1.130")
+ # ARP Announcement from bridge (not to be learned by AP for proxyarp)
+ send_arp(hapd, hapd_bssid=bssid, sender_ip="192.168.1.130",
+ target_ip="192.168.1.130")
+ send_arp(hapd, hapd_bssid=bssid, sender_ip="192.168.1.130",
+ target_ip="192.168.1.130", opcode=2)
matches = get_permanent_neighbors("ap-br0")
logger.info("After ARP Probe + Announcement: " + str(matches))
# ARP Request for the newly introduced IP address from wireless STA
- send_arp(dev[0], sender_ip="192.168.1.123", target_ip="192.168.1.128")
+ send_arp(dev[0], sender_ip="192.168.1.123", target_ip="192.168.1.130")
+ # ARP Response from bridge (AP does not proxy for non-wireless devices)
+ send_arp(hapd, hapd_bssid=bssid, dst_ll=addr0, sender_ip="192.168.1.130",
+ target_ip="192.168.1.123", opcode=2)
# ARP Request for the newly introduced IP address from bridge
send_arp(hapd, hapd_bssid=bssid, sender_ip="192.168.1.102",
- target_ip="192.168.1.128")
+ target_ip="192.168.1.130")
# ARP Probe from wireless STA (duplicate address; learned through DHCP)
send_arp(dev[1], target_ip="192.168.1.123")
time.sleep(0.1)
+ send_ns(dev[0], target="aaaa:bbbb:dddd::2", ip_src="aaaa:bbbb:cccc::2")
+ time.sleep(0.1)
+ send_ns(dev[1], target="aaaa:bbbb:cccc::2", ip_src="aaaa:bbbb:dddd::2")
+ time.sleep(0.1)
+ send_ns(hapd, hapd_bssid=bssid, target="aaaa:bbbb:dddd::2",
+ ip_src="aaaa:bbbb:ffff::2")
+ time.sleep(0.1)
+
+ # Try to probe for an already assigned address
+ send_ns(dev[1], target="aaaa:bbbb:cccc::2", ip_src="::")
+ time.sleep(0.1)
+ send_ns(hapd, hapd_bssid=bssid, target="aaaa:bbbb:cccc::2", ip_src="::")
+ time.sleep(0.1)
+
+ # Unsolicited NA
+ send_na(dev[1], target="aaaa:bbbb:cccc:aeae::3",
+ ip_src="aaaa:bbbb:cccc:aeae::3", ip_dst="ff02::1")
+ send_na(hapd, hapd_bssid=bssid, target="aaaa:bbbb:cccc:aeae::4",
+ ip_src="aaaa:bbbb:cccc:aeae::4", ip_dst="ff02::1")
+
+ hwsim_utils.test_connectivity_iface(dev[0], hapd, "ap-br0")
+ hwsim_utils.test_connectivity_iface(dev[1], hapd, "ap-br0")
+ hwsim_utils.test_connectivity(dev[0], dev[1])
+
dev[0].request("DISCONNECT")
dev[1].request("DISCONNECT")
time.sleep(0.5)
logger.info("After disconnect: " + str(matches))
if len(matches) > 0:
raise Exception("Unexpected neighbor entries after disconnect")
+ cmd = subprocess.Popen(['ebtables', '-L', '--Lc'], stdout=subprocess.PIPE)
+ res = cmd.stdout.read()
+ cmd.stdout.close()
+ logger.info("ebtables results:\n" + res)
def test_proxyarp_open(dev, apdev, params):
"""ProxyARP with open network"""
- res = None
try:
- res = _test_proxyarp_open(dev, apdev, params)
+ _test_proxyarp_open(dev, apdev, params)
finally:
+ try:
+ subprocess.call(['ebtables', '-F', 'FORWARD'])
+ subprocess.call(['ebtables', '-F', 'OUTPUT'])
+ except:
+ pass
subprocess.call(['ip', 'link', 'set', 'dev', 'ap-br0', 'down'],
stderr=open('/dev/null', 'w'))
subprocess.call(['brctl', 'delbr', 'ap-br0'],
stderr=open('/dev/null', 'w'))
-
- return res