]>
git.ipfire.org Git - thirdparty/hostap.git/blob - tests/hwsim/utils.py
2 # Copyright (c) 2013-2019, Jouni Malinen <j@w1.fi>
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
15 logger
= logging
.getLogger()
19 with
open("/proc/net/dev", "r") as f
:
24 ifnames
.append(val
[0].strip(' '))
27 class HwsimSkip(Exception):
28 def __init__(self
, reason
):
33 class alloc_fail(object):
34 def __init__(self
, dev
, count
, funcs
):
39 cmd
= "TEST_ALLOC_FAIL %d:%s" % (self
._count
, self
._funcs
)
40 if "OK" not in self
._dev
.request(cmd
):
41 raise HwsimSkip("TEST_ALLOC_FAIL not supported")
42 def __exit__(self
, type, value
, traceback
):
44 if self
._dev
.request("GET_ALLOC_FAIL") != "0:%s" % self
._funcs
:
45 raise Exception("Allocation failure did not trigger")
47 class fail_test(object):
48 def __init__(self
, dev
, count
, funcs
):
53 cmd
= "TEST_FAIL %d:%s" % (self
._count
, self
._funcs
)
54 if "OK" not in self
._dev
.request(cmd
):
55 raise HwsimSkip("TEST_FAIL not supported")
56 def __exit__(self
, type, value
, traceback
):
58 if self
._dev
.request("GET_FAIL") != "0:%s" % self
._funcs
:
59 raise Exception("Test failure did not trigger")
61 def wait_fail_trigger(dev
, cmd
, note
="Failure not triggered", max_iter
=40):
62 for i
in range(0, max_iter
):
63 if dev
.request(cmd
).startswith("0:"):
69 def require_under_vm():
70 with
open('/proc/1/cmdline', 'r') as f
:
72 if "inside.sh" not in cmd
:
73 raise HwsimSkip("Not running under VM")
75 def iface_is_in_bridge(bridge
, ifname
):
76 fname
= "/sys/class/net/"+ifname
+"/brport/bridge"
77 if not os
.path
.exists(fname
):
79 if not os
.path
.islink(fname
):
81 truebridge
= os
.path
.basename(os
.readlink(fname
))
82 if bridge
== truebridge
:
86 def skip_with_fips(dev
, reason
="Not supported in FIPS mode"):
87 res
= dev
.get_capability("fips")
88 if res
and 'FIPS' in res
:
89 raise HwsimSkip(reason
)
91 def get_phy(ap
, ifname
=None):
94 hostname
= ap
['hostname']
97 host
= remotehost
.Host(hostname
)
100 ifname
= ap
['ifname']
101 status
, buf
= host
.execute(["iw", "dev", ifname
, "info"])
103 raise Exception("iw " + ifname
+ " info failed")
104 lines
= buf
.split("\n")
108 phy
= "phy" + words
[1]
114 data
= binascii
.unhexlify(buf
)
115 while len(data
) >= 2:
116 ie
, elen
= struct
.unpack('BB', data
[0:2])
120 ret
[ie
] = data
[0:elen
]
124 def wait_regdom_changes(dev
):
126 ev
= dev
.wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout
=0.1)
130 def clear_country(dev
):
131 logger
.info("Try to clear country")
132 id = dev
[1].add_network()
133 dev
[1].set_network(id, "mode", "2")
134 dev
[1].set_network_quoted(id, "ssid", "country-clear")
135 dev
[1].set_network(id, "key_mgmt", "NONE")
136 dev
[1].set_network(id, "frequency", "2412")
137 dev
[1].set_network(id, "scan_freq", "2412")
138 dev
[1].select_network(id)
139 ev
= dev
[1].wait_event(["CTRL-EVENT-CONNECTED"])
141 dev
[0].connect("country-clear", key_mgmt
="NONE", scan_freq
="2412")
142 dev
[1].request("DISCONNECT")
143 dev
[0].wait_disconnected()
144 dev
[0].request("DISCONNECT")
145 dev
[0].request("ABORT_SCAN")
147 dev
[0].dump_monitor()
148 dev
[1].dump_monitor()
150 def clear_regdom(hapd
, dev
, count
=1):
152 hapd
.request("DISABLE")
154 for i
in range(count
):
155 dev
[i
].request("DISCONNECT")
156 for i
in range(count
):
157 dev
[i
].disconnect_and_stop_scan()
158 subprocess
.call(['iw', 'reg', 'set', '00'])
159 wait_regdom_changes(dev
[0])
160 country
= dev
[0].get_driver_status_field("country")
161 logger
.info("Country code at the end: " + country
)
164 for i
in range(count
):
165 dev
[i
].flush_scan_cache()
167 def radiotap_build():
168 radiotap_payload
= struct
.pack('BB', 0x08, 0)
169 radiotap_payload
+= struct
.pack('BB', 0, 0)
170 radiotap_payload
+= struct
.pack('BB', 0, 0)
171 radiotap_hdr
= struct
.pack('<BBHL', 0, 0, 8 + len(radiotap_payload
),
173 return radiotap_hdr
+ radiotap_payload
175 def start_monitor(ifname
, freq
=2412):
176 subprocess
.check_call(["iw", ifname
, "set", "type", "monitor"])
177 subprocess
.call(["ip", "link", "set", "dev", ifname
, "up"])
178 subprocess
.check_call(["iw", ifname
, "set", "freq", str(freq
)])
181 sock
= socket
.socket(socket
.AF_PACKET
, socket
.SOCK_RAW
,
182 socket
.htons(ETH_P_ALL
))
183 sock
.bind((ifname
, 0))
187 def stop_monitor(ifname
):
188 subprocess
.call(["ip", "link", "set", "dev", ifname
, "down"])
189 subprocess
.call(["iw", ifname
, "set", "type", "managed"])