]>
git.ipfire.org Git - thirdparty/hostap.git/blob - tests/hwsim/utils.py
2 # Copyright (c) 2013-2019, Jouni Malinen <j@w1.fi>
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
15 logger
= logging
.getLogger()
19 with
open("/proc/net/dev", "r") as f
:
24 ifnames
.append(val
[0].strip(' '))
27 class HwsimSkip(Exception):
28 def __init__(self
, reason
):
33 class alloc_fail(object):
34 def __init__(self
, dev
, count
, funcs
):
39 cmd
= "TEST_ALLOC_FAIL %d:%s" % (self
._count
, self
._funcs
)
40 if "OK" not in self
._dev
.request(cmd
):
41 raise HwsimSkip("TEST_ALLOC_FAIL not supported")
42 def __exit__(self
, type, value
, traceback
):
44 if self
._dev
.request("GET_ALLOC_FAIL") != "0:%s" % self
._funcs
:
45 raise Exception("Allocation failure did not trigger")
47 class fail_test(object):
48 def __init__(self
, dev
, count
, funcs
):
53 cmd
= "TEST_FAIL %d:%s" % (self
._count
, self
._funcs
)
54 if "OK" not in self
._dev
.request(cmd
):
55 raise HwsimSkip("TEST_FAIL not supported")
56 def __exit__(self
, type, value
, traceback
):
58 if self
._dev
.request("GET_FAIL") != "0:%s" % self
._funcs
:
59 raise Exception("Test failure did not trigger")
61 def wait_fail_trigger(dev
, cmd
, note
="Failure not triggered", max_iter
=40,
63 for i
in range(0, max_iter
):
64 if dev
.request(cmd
).startswith("0:"):
70 def require_under_vm():
71 with
open('/proc/1/cmdline', 'r') as f
:
73 if "inside.sh" not in cmd
:
74 raise HwsimSkip("Not running under VM")
76 def iface_is_in_bridge(bridge
, ifname
):
77 fname
= "/sys/class/net/"+ifname
+"/brport/bridge"
78 if not os
.path
.exists(fname
):
80 if not os
.path
.islink(fname
):
82 truebridge
= os
.path
.basename(os
.readlink(fname
))
83 if bridge
== truebridge
:
87 def skip_with_fips(dev
, reason
="Not supported in FIPS mode"):
88 res
= dev
.get_capability("fips")
89 if res
and 'FIPS' in res
:
90 raise HwsimSkip(reason
)
92 def check_ext_key_id_capa(dev
):
93 res
= dev
.get_driver_status_field('capa.flags')
94 if (int(res
, 0) & 0x8000000000000000) == 0:
95 raise HwsimSkip("Extended Key ID not supported")
97 def get_phy(ap
, ifname
=None):
100 hostname
= ap
['hostname']
103 host
= remotehost
.Host(hostname
)
106 ifname
= ap
['ifname']
107 status
, buf
= host
.execute(["iw", "dev", ifname
, "info"])
109 raise Exception("iw " + ifname
+ " info failed")
110 lines
= buf
.split("\n")
114 phy
= "phy" + words
[1]
120 data
= binascii
.unhexlify(buf
)
121 while len(data
) >= 2:
122 ie
, elen
= struct
.unpack('BB', data
[0:2])
126 ret
[ie
] = data
[0:elen
]
130 def wait_regdom_changes(dev
):
132 ev
= dev
.wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout
=0.1)
136 def clear_country(dev
):
137 logger
.info("Try to clear country")
138 id = dev
[1].add_network()
139 dev
[1].set_network(id, "mode", "2")
140 dev
[1].set_network_quoted(id, "ssid", "country-clear")
141 dev
[1].set_network(id, "key_mgmt", "NONE")
142 dev
[1].set_network(id, "frequency", "2412")
143 dev
[1].set_network(id, "scan_freq", "2412")
144 dev
[1].select_network(id)
145 ev
= dev
[1].wait_event(["CTRL-EVENT-CONNECTED"])
147 dev
[0].connect("country-clear", key_mgmt
="NONE", scan_freq
="2412")
148 dev
[1].request("DISCONNECT")
149 dev
[0].wait_disconnected()
150 dev
[0].request("DISCONNECT")
151 dev
[0].request("ABORT_SCAN")
153 dev
[0].dump_monitor()
154 dev
[1].dump_monitor()
156 def clear_regdom(hapd
, dev
, count
=1):
158 clear_regdom_dev(dev
, count
)
160 def disable_hapd(hapd
):
162 hapd
.request("DISABLE")
165 def clear_regdom_dev(dev
, count
=1):
166 for i
in range(count
):
167 dev
[i
].request("DISCONNECT")
168 for i
in range(count
):
169 dev
[i
].disconnect_and_stop_scan()
170 dev
[0].cmd_execute(['iw', 'reg', 'set', '00'])
171 wait_regdom_changes(dev
[0])
172 country
= dev
[0].get_driver_status_field("country")
173 logger
.info("Country code at the end: " + country
)
176 for i
in range(count
):
177 dev
[i
].flush_scan_cache()
179 def radiotap_build():
180 radiotap_payload
= struct
.pack('BB', 0x08, 0)
181 radiotap_payload
+= struct
.pack('BB', 0, 0)
182 radiotap_payload
+= struct
.pack('BB', 0, 0)
183 radiotap_hdr
= struct
.pack('<BBHL', 0, 0, 8 + len(radiotap_payload
),
185 return radiotap_hdr
+ radiotap_payload
187 def start_monitor(ifname
, freq
=2412):
188 subprocess
.check_call(["iw", ifname
, "set", "type", "monitor"])
189 subprocess
.call(["ip", "link", "set", "dev", ifname
, "up"])
190 subprocess
.check_call(["iw", ifname
, "set", "freq", str(freq
)])
193 sock
= socket
.socket(socket
.AF_PACKET
, socket
.SOCK_RAW
,
194 socket
.htons(ETH_P_ALL
))
195 sock
.bind((ifname
, 0))
199 def stop_monitor(ifname
):
200 subprocess
.call(["ip", "link", "set", "dev", ifname
, "down"])
201 subprocess
.call(["iw", ifname
, "set", "type", "managed"])