]> git.ipfire.org Git - thirdparty/hostap.git/blob - tests/hwsim/utils.py
tests: Clear regulatory domain on the correct remote device
[thirdparty/hostap.git] / tests / hwsim / utils.py
1 # Testing utilities
2 # Copyright (c) 2013-2019, Jouni Malinen <j@w1.fi>
3 #
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
6
7 import binascii
8 import os
9 import socket
10 import struct
11 import subprocess
12 import time
13 import remotehost
14 import logging
15 logger = logging.getLogger()
16
17 def get_ifnames():
18 ifnames = []
19 with open("/proc/net/dev", "r") as f:
20 lines = f.readlines()
21 for l in lines:
22 val = l.split(':', 1)
23 if len(val) == 2:
24 ifnames.append(val[0].strip(' '))
25 return ifnames
26
27 class HwsimSkip(Exception):
28 def __init__(self, reason):
29 self.reason = reason
30 def __str__(self):
31 return self.reason
32
33 class alloc_fail(object):
34 def __init__(self, dev, count, funcs):
35 self._dev = dev
36 self._count = count
37 self._funcs = funcs
38 def __enter__(self):
39 cmd = "TEST_ALLOC_FAIL %d:%s" % (self._count, self._funcs)
40 if "OK" not in self._dev.request(cmd):
41 raise HwsimSkip("TEST_ALLOC_FAIL not supported")
42 def __exit__(self, type, value, traceback):
43 if type is None:
44 if self._dev.request("GET_ALLOC_FAIL") != "0:%s" % self._funcs:
45 raise Exception("Allocation failure did not trigger")
46
47 class fail_test(object):
48 def __init__(self, dev, count, funcs):
49 self._dev = dev
50 self._count = count
51 self._funcs = funcs
52 def __enter__(self):
53 cmd = "TEST_FAIL %d:%s" % (self._count, self._funcs)
54 if "OK" not in self._dev.request(cmd):
55 raise HwsimSkip("TEST_FAIL not supported")
56 def __exit__(self, type, value, traceback):
57 if type is None:
58 if self._dev.request("GET_FAIL") != "0:%s" % self._funcs:
59 raise Exception("Test failure did not trigger")
60
61 def wait_fail_trigger(dev, cmd, note="Failure not triggered", max_iter=40,
62 timeout=0.05):
63 for i in range(0, max_iter):
64 if dev.request(cmd).startswith("0:"):
65 break
66 if i == max_iter - 1:
67 raise Exception(note)
68 time.sleep(timeout)
69
70 def require_under_vm():
71 with open('/proc/1/cmdline', 'r') as f:
72 cmd = f.read()
73 if "inside.sh" not in cmd:
74 raise HwsimSkip("Not running under VM")
75
76 def iface_is_in_bridge(bridge, ifname):
77 fname = "/sys/class/net/"+ifname+"/brport/bridge"
78 if not os.path.exists(fname):
79 return False
80 if not os.path.islink(fname):
81 return False
82 truebridge = os.path.basename(os.readlink(fname))
83 if bridge == truebridge:
84 return True
85 return False
86
87 def skip_with_fips(dev, reason="Not supported in FIPS mode"):
88 res = dev.get_capability("fips")
89 if res and 'FIPS' in res:
90 raise HwsimSkip(reason)
91
92 def get_phy(ap, ifname=None):
93 phy = "phy3"
94 try:
95 hostname = ap['hostname']
96 except:
97 hostname = None
98 host = remotehost.Host(hostname)
99
100 if ifname == None:
101 ifname = ap['ifname']
102 status, buf = host.execute(["iw", "dev", ifname, "info"])
103 if status != 0:
104 raise Exception("iw " + ifname + " info failed")
105 lines = buf.split("\n")
106 for line in lines:
107 if "wiphy" in line:
108 words = line.split()
109 phy = "phy" + words[1]
110 break
111 return phy
112
113 def parse_ie(buf):
114 ret = {}
115 data = binascii.unhexlify(buf)
116 while len(data) >= 2:
117 ie, elen = struct.unpack('BB', data[0:2])
118 data = data[2:]
119 if elen > len(data):
120 break
121 ret[ie] = data[0:elen]
122 data = data[elen:]
123 return ret
124
125 def wait_regdom_changes(dev):
126 for i in range(10):
127 ev = dev.wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=0.1)
128 if ev is None:
129 break
130
131 def clear_country(dev):
132 logger.info("Try to clear country")
133 id = dev[1].add_network()
134 dev[1].set_network(id, "mode", "2")
135 dev[1].set_network_quoted(id, "ssid", "country-clear")
136 dev[1].set_network(id, "key_mgmt", "NONE")
137 dev[1].set_network(id, "frequency", "2412")
138 dev[1].set_network(id, "scan_freq", "2412")
139 dev[1].select_network(id)
140 ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"])
141 if ev:
142 dev[0].connect("country-clear", key_mgmt="NONE", scan_freq="2412")
143 dev[1].request("DISCONNECT")
144 dev[0].wait_disconnected()
145 dev[0].request("DISCONNECT")
146 dev[0].request("ABORT_SCAN")
147 time.sleep(1)
148 dev[0].dump_monitor()
149 dev[1].dump_monitor()
150
151 def clear_regdom(hapd, dev, count=1):
152 disable_hapd(hapd)
153 clear_regdom_dev(dev, count)
154
155 def disable_hapd(hapd):
156 if hapd:
157 hapd.request("DISABLE")
158 time.sleep(0.1)
159
160 def clear_regdom_dev(dev, count=1):
161 for i in range(count):
162 dev[i].request("DISCONNECT")
163 for i in range(count):
164 dev[i].disconnect_and_stop_scan()
165 dev[0].cmd_execute(['iw', 'reg', 'set', '00'])
166 wait_regdom_changes(dev[0])
167 country = dev[0].get_driver_status_field("country")
168 logger.info("Country code at the end: " + country)
169 if country != "00":
170 clear_country(dev)
171 for i in range(count):
172 dev[i].flush_scan_cache()
173
174 def radiotap_build():
175 radiotap_payload = struct.pack('BB', 0x08, 0)
176 radiotap_payload += struct.pack('BB', 0, 0)
177 radiotap_payload += struct.pack('BB', 0, 0)
178 radiotap_hdr = struct.pack('<BBHL', 0, 0, 8 + len(radiotap_payload),
179 0xc002)
180 return radiotap_hdr + radiotap_payload
181
182 def start_monitor(ifname, freq=2412):
183 subprocess.check_call(["iw", ifname, "set", "type", "monitor"])
184 subprocess.call(["ip", "link", "set", "dev", ifname, "up"])
185 subprocess.check_call(["iw", ifname, "set", "freq", str(freq)])
186
187 ETH_P_ALL = 3
188 sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW,
189 socket.htons(ETH_P_ALL))
190 sock.bind((ifname, 0))
191 sock.settimeout(0.5)
192 return sock
193
194 def stop_monitor(ifname):
195 subprocess.call(["ip", "link", "set", "dev", ifname, "down"])
196 subprocess.call(["iw", ifname, "set", "type", "managed"])