]> git.ipfire.org Git - thirdparty/hostap.git/blob - tests/hwsim/utils.py
dbf9a18f1c80d53a2aaa474516474f1fd8373e79
[thirdparty/hostap.git] / tests / hwsim / utils.py
1 # Testing utilities
2 # Copyright (c) 2013-2019, Jouni Malinen <j@w1.fi>
3 #
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
6
7 import binascii
8 import os
9 import struct
10 import subprocess
11 import time
12 import remotehost
13 import logging
14 logger = logging.getLogger()
15
16 def get_ifnames():
17 ifnames = []
18 with open("/proc/net/dev", "r") as f:
19 lines = f.readlines()
20 for l in lines:
21 val = l.split(':', 1)
22 if len(val) == 2:
23 ifnames.append(val[0].strip(' '))
24 return ifnames
25
26 class HwsimSkip(Exception):
27 def __init__(self, reason):
28 self.reason = reason
29 def __str__(self):
30 return self.reason
31
32 class alloc_fail(object):
33 def __init__(self, dev, count, funcs):
34 self._dev = dev
35 self._count = count
36 self._funcs = funcs
37 def __enter__(self):
38 cmd = "TEST_ALLOC_FAIL %d:%s" % (self._count, self._funcs)
39 if "OK" not in self._dev.request(cmd):
40 raise HwsimSkip("TEST_ALLOC_FAIL not supported")
41 def __exit__(self, type, value, traceback):
42 if type is None:
43 if self._dev.request("GET_ALLOC_FAIL") != "0:%s" % self._funcs:
44 raise Exception("Allocation failure did not trigger")
45
46 class fail_test(object):
47 def __init__(self, dev, count, funcs):
48 self._dev = dev
49 self._count = count
50 self._funcs = funcs
51 def __enter__(self):
52 cmd = "TEST_FAIL %d:%s" % (self._count, self._funcs)
53 if "OK" not in self._dev.request(cmd):
54 raise HwsimSkip("TEST_FAIL not supported")
55 def __exit__(self, type, value, traceback):
56 if type is None:
57 if self._dev.request("GET_FAIL") != "0:%s" % self._funcs:
58 raise Exception("Test failure did not trigger")
59
60 def wait_fail_trigger(dev, cmd, note="Failure not triggered", max_iter=40):
61 for i in range(0, max_iter):
62 if dev.request(cmd).startswith("0:"):
63 break
64 if i == max_iter - 1:
65 raise Exception(note)
66 time.sleep(0.05)
67
68 def require_under_vm():
69 with open('/proc/1/cmdline', 'r') as f:
70 cmd = f.read()
71 if "inside.sh" not in cmd:
72 raise HwsimSkip("Not running under VM")
73
74 def iface_is_in_bridge(bridge, ifname):
75 fname = "/sys/class/net/"+ifname+"/brport/bridge"
76 if not os.path.exists(fname):
77 return False
78 if not os.path.islink(fname):
79 return False
80 truebridge = os.path.basename(os.readlink(fname))
81 if bridge == truebridge:
82 return True
83 return False
84
85 def skip_with_fips(dev, reason="Not supported in FIPS mode"):
86 res = dev.get_capability("fips")
87 if res and 'FIPS' in res:
88 raise HwsimSkip(reason)
89
90 def get_phy(ap, ifname=None):
91 phy = "phy3"
92 try:
93 hostname = ap['hostname']
94 except:
95 hostname = None
96 host = remotehost.Host(hostname)
97
98 if ifname == None:
99 ifname = ap['ifname']
100 status, buf = host.execute(["iw", "dev", ifname, "info"])
101 if status != 0:
102 raise Exception("iw " + ifname + " info failed")
103 lines = buf.split("\n")
104 for line in lines:
105 if "wiphy" in line:
106 words = line.split()
107 phy = "phy" + words[1]
108 break
109 return phy
110
111 def parse_ie(buf):
112 ret = {}
113 data = binascii.unhexlify(buf)
114 while len(data) >= 2:
115 ie, elen = struct.unpack('BB', data[0:2])
116 data = data[2:]
117 if elen > len(data):
118 break
119 ret[ie] = data[0:elen]
120 data = data[elen:]
121 return ret
122
123 def wait_regdom_changes(dev):
124 for i in range(10):
125 ev = dev.wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=0.1)
126 if ev is None:
127 break
128
129 def clear_country(dev):
130 logger.info("Try to clear country")
131 id = dev[1].add_network()
132 dev[1].set_network(id, "mode", "2")
133 dev[1].set_network_quoted(id, "ssid", "country-clear")
134 dev[1].set_network(id, "key_mgmt", "NONE")
135 dev[1].set_network(id, "frequency", "2412")
136 dev[1].set_network(id, "scan_freq", "2412")
137 dev[1].select_network(id)
138 ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"])
139 if ev:
140 dev[0].connect("country-clear", key_mgmt="NONE", scan_freq="2412")
141 dev[1].request("DISCONNECT")
142 dev[0].wait_disconnected()
143 dev[0].request("DISCONNECT")
144 dev[0].request("ABORT_SCAN")
145 time.sleep(1)
146 dev[0].dump_monitor()
147 dev[1].dump_monitor()
148
149 def clear_regdom(hapd, dev, count=1):
150 if hapd:
151 hapd.request("DISABLE")
152 time.sleep(0.1)
153 for i in range(count):
154 dev[i].request("DISCONNECT")
155 for i in range(count):
156 dev[i].disconnect_and_stop_scan()
157 subprocess.call(['iw', 'reg', 'set', '00'])
158 wait_regdom_changes(dev[0])
159 country = dev[0].get_driver_status_field("country")
160 logger.info("Country code at the end: " + country)
161 if country != "00":
162 clear_country(dev)
163 for i in range(count):
164 dev[i].flush_scan_cache()