]>
git.ipfire.org Git - thirdparty/hostap.git/blob - tests/hwsim/utils.py
2 # Copyright (c) 2013-2019, Jouni Malinen <j@w1.fi>
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
15 logger
= logging
.getLogger()
19 with
open("/proc/net/dev", "r") as f
:
24 ifnames
.append(val
[0].strip(' '))
27 class HwsimSkip(Exception):
28 def __init__(self
, reason
):
33 class alloc_fail(object):
34 def __init__(self
, dev
, count
, funcs
):
39 cmd
= "TEST_ALLOC_FAIL %d:%s" % (self
._count
, self
._funcs
)
40 if "OK" not in self
._dev
.request(cmd
):
41 raise HwsimSkip("TEST_ALLOC_FAIL not supported")
42 def __exit__(self
, type, value
, traceback
):
44 if self
._dev
.request("GET_ALLOC_FAIL") != "0:%s" % self
._funcs
:
45 raise Exception("Allocation failure did not trigger")
47 class fail_test(object):
48 def __init__(self
, dev
, count
, funcs
):
53 cmd
= "TEST_FAIL %d:%s" % (self
._count
, self
._funcs
)
54 if "OK" not in self
._dev
.request(cmd
):
55 raise HwsimSkip("TEST_FAIL not supported")
56 def __exit__(self
, type, value
, traceback
):
58 if self
._dev
.request("GET_FAIL") != "0:%s" % self
._funcs
:
59 raise Exception("Test failure did not trigger")
61 def wait_fail_trigger(dev
, cmd
, note
="Failure not triggered", max_iter
=40,
63 for i
in range(0, max_iter
):
64 if dev
.request(cmd
).startswith("0:"):
70 def require_under_vm():
71 with
open('/proc/1/cmdline', 'r') as f
:
73 if "inside.sh" not in cmd
:
74 raise HwsimSkip("Not running under VM")
76 def iface_is_in_bridge(bridge
, ifname
):
77 fname
= "/sys/class/net/"+ifname
+"/brport/bridge"
78 if not os
.path
.exists(fname
):
80 if not os
.path
.islink(fname
):
82 truebridge
= os
.path
.basename(os
.readlink(fname
))
83 if bridge
== truebridge
:
87 def skip_with_fips(dev
, reason
="Not supported in FIPS mode"):
88 res
= dev
.get_capability("fips")
89 if res
and 'FIPS' in res
:
90 raise HwsimSkip(reason
)
92 def get_phy(ap
, ifname
=None):
95 hostname
= ap
['hostname']
98 host
= remotehost
.Host(hostname
)
101 ifname
= ap
['ifname']
102 status
, buf
= host
.execute(["iw", "dev", ifname
, "info"])
104 raise Exception("iw " + ifname
+ " info failed")
105 lines
= buf
.split("\n")
109 phy
= "phy" + words
[1]
115 data
= binascii
.unhexlify(buf
)
116 while len(data
) >= 2:
117 ie
, elen
= struct
.unpack('BB', data
[0:2])
121 ret
[ie
] = data
[0:elen
]
125 def wait_regdom_changes(dev
):
127 ev
= dev
.wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout
=0.1)
131 def clear_country(dev
):
132 logger
.info("Try to clear country")
133 id = dev
[1].add_network()
134 dev
[1].set_network(id, "mode", "2")
135 dev
[1].set_network_quoted(id, "ssid", "country-clear")
136 dev
[1].set_network(id, "key_mgmt", "NONE")
137 dev
[1].set_network(id, "frequency", "2412")
138 dev
[1].set_network(id, "scan_freq", "2412")
139 dev
[1].select_network(id)
140 ev
= dev
[1].wait_event(["CTRL-EVENT-CONNECTED"])
142 dev
[0].connect("country-clear", key_mgmt
="NONE", scan_freq
="2412")
143 dev
[1].request("DISCONNECT")
144 dev
[0].wait_disconnected()
145 dev
[0].request("DISCONNECT")
146 dev
[0].request("ABORT_SCAN")
148 dev
[0].dump_monitor()
149 dev
[1].dump_monitor()
151 def clear_regdom(hapd
, dev
, count
=1):
153 clear_regdom_dev(dev
, count
)
155 def disable_hapd(hapd
):
157 hapd
.request("DISABLE")
160 def clear_regdom_dev(dev
, count
=1):
161 for i
in range(count
):
162 dev
[i
].request("DISCONNECT")
163 for i
in range(count
):
164 dev
[i
].disconnect_and_stop_scan()
165 dev
[0].cmd_execute(['iw', 'reg', 'set', '00'])
166 wait_regdom_changes(dev
[0])
167 country
= dev
[0].get_driver_status_field("country")
168 logger
.info("Country code at the end: " + country
)
171 for i
in range(count
):
172 dev
[i
].flush_scan_cache()
174 def radiotap_build():
175 radiotap_payload
= struct
.pack('BB', 0x08, 0)
176 radiotap_payload
+= struct
.pack('BB', 0, 0)
177 radiotap_payload
+= struct
.pack('BB', 0, 0)
178 radiotap_hdr
= struct
.pack('<BBHL', 0, 0, 8 + len(radiotap_payload
),
180 return radiotap_hdr
+ radiotap_payload
182 def start_monitor(ifname
, freq
=2412):
183 subprocess
.check_call(["iw", ifname
, "set", "type", "monitor"])
184 subprocess
.call(["ip", "link", "set", "dev", ifname
, "up"])
185 subprocess
.check_call(["iw", ifname
, "set", "freq", str(freq
)])
188 sock
= socket
.socket(socket
.AF_PACKET
, socket
.SOCK_RAW
,
189 socket
.htons(ETH_P_ALL
))
190 sock
.bind((ifname
, 0))
194 def stop_monitor(ifname
):
195 subprocess
.call(["ip", "link", "set", "dev", ifname
, "down"])
196 subprocess
.call(["iw", ifname
, "set", "type", "managed"])