]> git.ipfire.org Git - thirdparty/hostap.git/blob - tests/hwsim/utils.py
tests: Protocol testing for supplicant PMF/IGTK KDE handling
[thirdparty/hostap.git] / tests / hwsim / utils.py
1 # Testing utilities
2 # Copyright (c) 2013-2019, Jouni Malinen <j@w1.fi>
3 #
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
6
7 import binascii
8 import os
9 import socket
10 import struct
11 import subprocess
12 import time
13 import remotehost
14 import logging
15 logger = logging.getLogger()
16
17 def get_ifnames():
18 ifnames = []
19 with open("/proc/net/dev", "r") as f:
20 lines = f.readlines()
21 for l in lines:
22 val = l.split(':', 1)
23 if len(val) == 2:
24 ifnames.append(val[0].strip(' '))
25 return ifnames
26
27 class HwsimSkip(Exception):
28 def __init__(self, reason):
29 self.reason = reason
30 def __str__(self):
31 return self.reason
32
33 class alloc_fail(object):
34 def __init__(self, dev, count, funcs):
35 self._dev = dev
36 self._count = count
37 self._funcs = funcs
38 def __enter__(self):
39 cmd = "TEST_ALLOC_FAIL %d:%s" % (self._count, self._funcs)
40 if "OK" not in self._dev.request(cmd):
41 raise HwsimSkip("TEST_ALLOC_FAIL not supported")
42 def __exit__(self, type, value, traceback):
43 if type is None:
44 if self._dev.request("GET_ALLOC_FAIL") != "0:%s" % self._funcs:
45 raise Exception("Allocation failure did not trigger")
46
47 class fail_test(object):
48 def __init__(self, dev, count, funcs):
49 self._dev = dev
50 self._count = count
51 self._funcs = funcs
52 def __enter__(self):
53 cmd = "TEST_FAIL %d:%s" % (self._count, self._funcs)
54 if "OK" not in self._dev.request(cmd):
55 raise HwsimSkip("TEST_FAIL not supported")
56 def __exit__(self, type, value, traceback):
57 if type is None:
58 if self._dev.request("GET_FAIL") != "0:%s" % self._funcs:
59 raise Exception("Test failure did not trigger")
60
61 def wait_fail_trigger(dev, cmd, note="Failure not triggered", max_iter=40):
62 for i in range(0, max_iter):
63 if dev.request(cmd).startswith("0:"):
64 break
65 if i == max_iter - 1:
66 raise Exception(note)
67 time.sleep(0.05)
68
69 def require_under_vm():
70 with open('/proc/1/cmdline', 'r') as f:
71 cmd = f.read()
72 if "inside.sh" not in cmd:
73 raise HwsimSkip("Not running under VM")
74
75 def iface_is_in_bridge(bridge, ifname):
76 fname = "/sys/class/net/"+ifname+"/brport/bridge"
77 if not os.path.exists(fname):
78 return False
79 if not os.path.islink(fname):
80 return False
81 truebridge = os.path.basename(os.readlink(fname))
82 if bridge == truebridge:
83 return True
84 return False
85
86 def skip_with_fips(dev, reason="Not supported in FIPS mode"):
87 res = dev.get_capability("fips")
88 if res and 'FIPS' in res:
89 raise HwsimSkip(reason)
90
91 def get_phy(ap, ifname=None):
92 phy = "phy3"
93 try:
94 hostname = ap['hostname']
95 except:
96 hostname = None
97 host = remotehost.Host(hostname)
98
99 if ifname == None:
100 ifname = ap['ifname']
101 status, buf = host.execute(["iw", "dev", ifname, "info"])
102 if status != 0:
103 raise Exception("iw " + ifname + " info failed")
104 lines = buf.split("\n")
105 for line in lines:
106 if "wiphy" in line:
107 words = line.split()
108 phy = "phy" + words[1]
109 break
110 return phy
111
112 def parse_ie(buf):
113 ret = {}
114 data = binascii.unhexlify(buf)
115 while len(data) >= 2:
116 ie, elen = struct.unpack('BB', data[0:2])
117 data = data[2:]
118 if elen > len(data):
119 break
120 ret[ie] = data[0:elen]
121 data = data[elen:]
122 return ret
123
124 def wait_regdom_changes(dev):
125 for i in range(10):
126 ev = dev.wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=0.1)
127 if ev is None:
128 break
129
130 def clear_country(dev):
131 logger.info("Try to clear country")
132 id = dev[1].add_network()
133 dev[1].set_network(id, "mode", "2")
134 dev[1].set_network_quoted(id, "ssid", "country-clear")
135 dev[1].set_network(id, "key_mgmt", "NONE")
136 dev[1].set_network(id, "frequency", "2412")
137 dev[1].set_network(id, "scan_freq", "2412")
138 dev[1].select_network(id)
139 ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"])
140 if ev:
141 dev[0].connect("country-clear", key_mgmt="NONE", scan_freq="2412")
142 dev[1].request("DISCONNECT")
143 dev[0].wait_disconnected()
144 dev[0].request("DISCONNECT")
145 dev[0].request("ABORT_SCAN")
146 time.sleep(1)
147 dev[0].dump_monitor()
148 dev[1].dump_monitor()
149
150 def clear_regdom(hapd, dev, count=1):
151 if hapd:
152 hapd.request("DISABLE")
153 time.sleep(0.1)
154 for i in range(count):
155 dev[i].request("DISCONNECT")
156 for i in range(count):
157 dev[i].disconnect_and_stop_scan()
158 subprocess.call(['iw', 'reg', 'set', '00'])
159 wait_regdom_changes(dev[0])
160 country = dev[0].get_driver_status_field("country")
161 logger.info("Country code at the end: " + country)
162 if country != "00":
163 clear_country(dev)
164 for i in range(count):
165 dev[i].flush_scan_cache()
166
167 def radiotap_build():
168 radiotap_payload = struct.pack('BB', 0x08, 0)
169 radiotap_payload += struct.pack('BB', 0, 0)
170 radiotap_payload += struct.pack('BB', 0, 0)
171 radiotap_hdr = struct.pack('<BBHL', 0, 0, 8 + len(radiotap_payload),
172 0xc002)
173 return radiotap_hdr + radiotap_payload
174
175 def start_monitor(ifname, freq=2412):
176 subprocess.check_call(["iw", ifname, "set", "type", "monitor"])
177 subprocess.call(["ip", "link", "set", "dev", ifname, "up"])
178 subprocess.check_call(["iw", ifname, "set", "freq", str(freq)])
179
180 ETH_P_ALL = 3
181 sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW,
182 socket.htons(ETH_P_ALL))
183 sock.bind((ifname, 0))
184 sock.settimeout(0.5)
185 return sock
186
187 def stop_monitor(ifname):
188 subprocess.call(["ip", "link", "set", "dev", ifname, "down"])
189 subprocess.call(["iw", ifname, "set", "type", "managed"])