]> git.ipfire.org Git - thirdparty/hostap.git/blob - tests/hwsim/utils.py
tests: Extended Key ID
[thirdparty/hostap.git] / tests / hwsim / utils.py
1 # Testing utilities
2 # Copyright (c) 2013-2019, Jouni Malinen <j@w1.fi>
3 #
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
6
7 import binascii
8 import os
9 import socket
10 import struct
11 import subprocess
12 import time
13 import remotehost
14 import logging
15 logger = logging.getLogger()
16
17 def get_ifnames():
18 ifnames = []
19 with open("/proc/net/dev", "r") as f:
20 lines = f.readlines()
21 for l in lines:
22 val = l.split(':', 1)
23 if len(val) == 2:
24 ifnames.append(val[0].strip(' '))
25 return ifnames
26
27 class HwsimSkip(Exception):
28 def __init__(self, reason):
29 self.reason = reason
30 def __str__(self):
31 return self.reason
32
33 class alloc_fail(object):
34 def __init__(self, dev, count, funcs):
35 self._dev = dev
36 self._count = count
37 self._funcs = funcs
38 def __enter__(self):
39 cmd = "TEST_ALLOC_FAIL %d:%s" % (self._count, self._funcs)
40 if "OK" not in self._dev.request(cmd):
41 raise HwsimSkip("TEST_ALLOC_FAIL not supported")
42 def __exit__(self, type, value, traceback):
43 if type is None:
44 if self._dev.request("GET_ALLOC_FAIL") != "0:%s" % self._funcs:
45 raise Exception("Allocation failure did not trigger")
46
47 class fail_test(object):
48 def __init__(self, dev, count, funcs):
49 self._dev = dev
50 self._count = count
51 self._funcs = funcs
52 def __enter__(self):
53 cmd = "TEST_FAIL %d:%s" % (self._count, self._funcs)
54 if "OK" not in self._dev.request(cmd):
55 raise HwsimSkip("TEST_FAIL not supported")
56 def __exit__(self, type, value, traceback):
57 if type is None:
58 if self._dev.request("GET_FAIL") != "0:%s" % self._funcs:
59 raise Exception("Test failure did not trigger")
60
61 def wait_fail_trigger(dev, cmd, note="Failure not triggered", max_iter=40,
62 timeout=0.05):
63 for i in range(0, max_iter):
64 if dev.request(cmd).startswith("0:"):
65 break
66 if i == max_iter - 1:
67 raise Exception(note)
68 time.sleep(timeout)
69
70 def require_under_vm():
71 with open('/proc/1/cmdline', 'r') as f:
72 cmd = f.read()
73 if "inside.sh" not in cmd:
74 raise HwsimSkip("Not running under VM")
75
76 def iface_is_in_bridge(bridge, ifname):
77 fname = "/sys/class/net/"+ifname+"/brport/bridge"
78 if not os.path.exists(fname):
79 return False
80 if not os.path.islink(fname):
81 return False
82 truebridge = os.path.basename(os.readlink(fname))
83 if bridge == truebridge:
84 return True
85 return False
86
87 def skip_with_fips(dev, reason="Not supported in FIPS mode"):
88 res = dev.get_capability("fips")
89 if res and 'FIPS' in res:
90 raise HwsimSkip(reason)
91
92 def check_ext_key_id_capa(dev):
93 res = dev.get_driver_status_field('capa.flags')
94 if (int(res, 0) & 0x8000000000000000) == 0:
95 raise HwsimSkip("Extended Key ID not supported")
96
97 def get_phy(ap, ifname=None):
98 phy = "phy3"
99 try:
100 hostname = ap['hostname']
101 except:
102 hostname = None
103 host = remotehost.Host(hostname)
104
105 if ifname == None:
106 ifname = ap['ifname']
107 status, buf = host.execute(["iw", "dev", ifname, "info"])
108 if status != 0:
109 raise Exception("iw " + ifname + " info failed")
110 lines = buf.split("\n")
111 for line in lines:
112 if "wiphy" in line:
113 words = line.split()
114 phy = "phy" + words[1]
115 break
116 return phy
117
118 def parse_ie(buf):
119 ret = {}
120 data = binascii.unhexlify(buf)
121 while len(data) >= 2:
122 ie, elen = struct.unpack('BB', data[0:2])
123 data = data[2:]
124 if elen > len(data):
125 break
126 ret[ie] = data[0:elen]
127 data = data[elen:]
128 return ret
129
130 def wait_regdom_changes(dev):
131 for i in range(10):
132 ev = dev.wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=0.1)
133 if ev is None:
134 break
135
136 def clear_country(dev):
137 logger.info("Try to clear country")
138 id = dev[1].add_network()
139 dev[1].set_network(id, "mode", "2")
140 dev[1].set_network_quoted(id, "ssid", "country-clear")
141 dev[1].set_network(id, "key_mgmt", "NONE")
142 dev[1].set_network(id, "frequency", "2412")
143 dev[1].set_network(id, "scan_freq", "2412")
144 dev[1].select_network(id)
145 ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"])
146 if ev:
147 dev[0].connect("country-clear", key_mgmt="NONE", scan_freq="2412")
148 dev[1].request("DISCONNECT")
149 dev[0].wait_disconnected()
150 dev[0].request("DISCONNECT")
151 dev[0].request("ABORT_SCAN")
152 time.sleep(1)
153 dev[0].dump_monitor()
154 dev[1].dump_monitor()
155
156 def clear_regdom(hapd, dev, count=1):
157 disable_hapd(hapd)
158 clear_regdom_dev(dev, count)
159
160 def disable_hapd(hapd):
161 if hapd:
162 hapd.request("DISABLE")
163 time.sleep(0.1)
164
165 def clear_regdom_dev(dev, count=1):
166 for i in range(count):
167 dev[i].request("DISCONNECT")
168 for i in range(count):
169 dev[i].disconnect_and_stop_scan()
170 dev[0].cmd_execute(['iw', 'reg', 'set', '00'])
171 wait_regdom_changes(dev[0])
172 country = dev[0].get_driver_status_field("country")
173 logger.info("Country code at the end: " + country)
174 if country != "00":
175 clear_country(dev)
176 for i in range(count):
177 dev[i].flush_scan_cache()
178
179 def radiotap_build():
180 radiotap_payload = struct.pack('BB', 0x08, 0)
181 radiotap_payload += struct.pack('BB', 0, 0)
182 radiotap_payload += struct.pack('BB', 0, 0)
183 radiotap_hdr = struct.pack('<BBHL', 0, 0, 8 + len(radiotap_payload),
184 0xc002)
185 return radiotap_hdr + radiotap_payload
186
187 def start_monitor(ifname, freq=2412):
188 subprocess.check_call(["iw", ifname, "set", "type", "monitor"])
189 subprocess.call(["ip", "link", "set", "dev", ifname, "up"])
190 subprocess.check_call(["iw", ifname, "set", "freq", str(freq)])
191
192 ETH_P_ALL = 3
193 sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW,
194 socket.htons(ETH_P_ALL))
195 sock.bind((ifname, 0))
196 sock.settimeout(0.5)
197 return sock
198
199 def stop_monitor(ifname):
200 subprocess.call(["ip", "link", "set", "dev", ifname, "down"])
201 subprocess.call(["iw", ifname, "set", "type", "managed"])