]> git.ipfire.org Git - thirdparty/hostap.git/blame - tests/hwsim/hwsim_utils.py
tests: Use python3 compatible "except" statement
[thirdparty/hostap.git] / tests / hwsim / hwsim_utils.py
CommitLineData
a311c61d 1# hwsim testing utilities
8a9a3b34 2# Copyright (c) 2013-2014, Jouni Malinen <j@w1.fi>
a311c61d
JM
3#
4# This software may be distributed under the terms of the BSD license.
5# See README for more details.
6
280cd8a9 7import os
8a9a3b34 8import time
a311c61d 9import logging
c9aa4308 10logger = logging.getLogger()
a311c61d 11
ce9c8c40
JM
12from wpasupplicant import WpaSupplicant
13
1131a1c8 14def run_connectivity_test(dev1, dev2, tos, dev1group=False, dev2group=False,
80fe420a 15 ifname1=None, ifname2=None, config=True, timeout=5,
93701b4a
JM
16 multicast_to_unicast=False, broadcast=True,
17 send_len=None):
ce9c8c40
JM
18 addr1 = dev1.own_addr()
19 if not dev1group and isinstance(dev1, WpaSupplicant):
e7b14190
JM
20 addr = dev1.get_driver_status_field('addr')
21 if addr:
22 addr1 = addr
ce9c8c40
JM
23
24 addr2 = dev2.own_addr()
25 if not dev2group and isinstance(dev2, WpaSupplicant):
e7b14190
JM
26 addr = dev2.get_driver_status_field('addr')
27 if addr:
28 addr2 = addr
ce9c8c40 29
1d0b5a04
JM
30 dev1.dump_monitor()
31 dev2.dump_monitor()
32
e2f3f023
JA
33 if dev1.hostname is None and dev2.hostname is None:
34 broadcast_retry_c = 1
35 else:
36 broadcast_retry_c = 10
37
ce9c8c40 38 try:
6d930a1e
JM
39 if config:
40 cmd = "DATA_TEST_CONFIG 1"
41 if ifname1:
42 cmd = cmd + " ifname=" + ifname1
43 if dev1group:
44 res = dev1.group_request(cmd)
45 else:
46 res = dev1.request(cmd)
47 if "OK" not in res:
48 raise Exception("Failed to enable data test functionality")
49
50 cmd = "DATA_TEST_CONFIG 1"
51 if ifname2:
52 cmd = cmd + " ifname=" + ifname2
53 if dev2group:
54 res = dev2.group_request(cmd)
55 else:
56 res = dev2.request(cmd)
57 if "OK" not in res:
58 raise Exception("Failed to enable data test functionality")
ce9c8c40
JM
59
60 cmd = "DATA_TEST_TX {} {} {}".format(addr2, addr1, tos)
93701b4a
JM
61 if send_len is not None:
62 cmd += " len=" + str(send_len)
ce9c8c40
JM
63 if dev1group:
64 dev1.group_request(cmd)
65 else:
66 dev1.request(cmd)
67 if dev2group:
f77d6d4b 68 ev = dev2.wait_group_event(["DATA-TEST-RX"], timeout=timeout)
ce9c8c40 69 else:
f77d6d4b 70 ev = dev2.wait_event(["DATA-TEST-RX"], timeout=timeout)
ce9c8c40
JM
71 if ev is None:
72 raise Exception("dev1->dev2 unicast data delivery failed")
73 if "DATA-TEST-RX {} {}".format(addr2, addr1) not in ev:
74 raise Exception("Unexpected dev1->dev2 unicast data result")
93701b4a
JM
75 if send_len is not None:
76 if " len=" + str(send_len) not in ev:
77 raise Exception("Unexpected dev1->dev2 unicast data length")
78 else:
79 if " len=" in ev:
80 raise Exception("Unexpected dev1->dev2 unicast data length")
ce9c8c40 81
4bb2272b
JM
82 if broadcast:
83 cmd = "DATA_TEST_TX ff:ff:ff:ff:ff:ff {} {}".format(addr1, tos)
93701b4a
JM
84 if send_len is not None:
85 cmd += " len=" + str(send_len)
4bb2272b
JM
86 for i in xrange(broadcast_retry_c):
87 try:
88 if dev1group:
89 dev1.group_request(cmd)
90 else:
91 dev1.request(cmd)
92 if dev2group:
93 ev = dev2.wait_group_event(["DATA-TEST-RX"],
94 timeout=timeout)
95 else:
96 ev = dev2.wait_event(["DATA-TEST-RX"], timeout=timeout)
97 if ev is None:
98 raise Exception("dev1->dev2 broadcast data delivery failed")
99 if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr1) not in ev:
100 raise Exception("Unexpected dev1->dev2 broadcast data result")
93701b4a
JM
101 if send_len is not None:
102 if " len=" + str(send_len) not in ev:
103 raise Exception("Unexpected dev1->dev2 broadcast data length")
104 else:
105 if " len=" in ev:
106 raise Exception("Unexpected dev1->dev2 broadcast data length")
4bb2272b
JM
107 break
108 except Exception as e:
109 if i == broadcast_retry_c - 1:
110 raise
ce9c8c40
JM
111
112 cmd = "DATA_TEST_TX {} {} {}".format(addr1, addr2, tos)
93701b4a
JM
113 if send_len is not None:
114 cmd += " len=" + str(send_len)
ce9c8c40
JM
115 if dev2group:
116 dev2.group_request(cmd)
117 else:
118 dev2.request(cmd)
119 if dev1group:
f77d6d4b 120 ev = dev1.wait_group_event(["DATA-TEST-RX"], timeout=timeout)
ce9c8c40 121 else:
f77d6d4b 122 ev = dev1.wait_event(["DATA-TEST-RX"], timeout=timeout)
ce9c8c40
JM
123 if ev is None:
124 raise Exception("dev2->dev1 unicast data delivery failed")
125 if "DATA-TEST-RX {} {}".format(addr1, addr2) not in ev:
126 raise Exception("Unexpected dev2->dev1 unicast data result")
93701b4a
JM
127 if send_len is not None:
128 if " len=" + str(send_len) not in ev:
129 raise Exception("Unexpected dev2->dev1 unicast data length")
130 else:
131 if " len=" in ev:
132 raise Exception("Unexpected dev2->dev1 unicast data length")
ce9c8c40 133
4bb2272b
JM
134 if broadcast:
135 cmd = "DATA_TEST_TX ff:ff:ff:ff:ff:ff {} {}".format(addr2, tos)
93701b4a
JM
136 if send_len is not None:
137 cmd += " len=" + str(send_len)
4bb2272b
JM
138 for i in xrange(broadcast_retry_c):
139 try:
140 if dev2group:
141 dev2.group_request(cmd)
142 else:
143 dev2.request(cmd)
144 if dev1group:
145 ev = dev1.wait_group_event(["DATA-TEST-RX"],
146 timeout=timeout)
147 else:
148 ev = dev1.wait_event(["DATA-TEST-RX"], timeout=timeout)
149 if ev is None:
150 raise Exception("dev2->dev1 broadcast data delivery failed")
151 if multicast_to_unicast:
152 if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr2) in ev:
153 raise Exception("Unexpected dev2->dev1 broadcast data result: multicast to unicast conversion missing")
154 if "DATA-TEST-RX {} {}".format(addr1, addr2) not in ev:
155 raise Exception("Unexpected dev2->dev1 broadcast data result (multicast to unicast enabled)")
156 else:
157 if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr2) not in ev:
158 raise Exception("Unexpected dev2->dev1 broadcast data result")
93701b4a
JM
159 if send_len is not None:
160 if " len=" + str(send_len) not in ev:
161 raise Exception("Unexpected dev2->dev1 broadcast data length")
162 else:
163 if " len=" in ev:
164 raise Exception("Unexpected dev2->dev1 broadcast data length")
4bb2272b
JM
165 break
166 except Exception as e:
167 if i == broadcast_retry_c - 1:
168 raise
ce9c8c40 169 finally:
6d930a1e
JM
170 if config:
171 if dev1group:
172 dev1.group_request("DATA_TEST_CONFIG 0")
173 else:
174 dev1.request("DATA_TEST_CONFIG 0")
175 if dev2group:
176 dev2.group_request("DATA_TEST_CONFIG 0")
177 else:
178 dev2.request("DATA_TEST_CONFIG 0")
ce9c8c40 179
1131a1c8
JM
180def test_connectivity(dev1, dev2, dscp=None, tos=None, max_tries=1,
181 dev1group=False, dev2group=False,
80fe420a 182 ifname1=None, ifname2=None, config=True, timeout=5,
4bb2272b 183 multicast_to_unicast=False, success_expected=True,
93701b4a 184 broadcast=True, send_len=None):
ce9c8c40
JM
185 if dscp:
186 tos = dscp << 2
187 if not tos:
188 tos = 0
189
190 success = False
191 last_err = None
192 for i in range(0, max_tries):
193 try:
1131a1c8 194 run_connectivity_test(dev1, dev2, tos, dev1group, dev2group,
f77d6d4b 195 ifname1, ifname2, config=config,
80fe420a 196 timeout=timeout,
4bb2272b 197 multicast_to_unicast=multicast_to_unicast,
93701b4a 198 broadcast=broadcast, send_len=send_len)
ce9c8c40
JM
199 success = True
200 break
bab493b9 201 except Exception as e:
ce9c8c40
JM
202 last_err = e
203 if i + 1 < max_tries:
204 time.sleep(1)
8d85e291 205 if success_expected and not success:
ce9c8c40 206 raise Exception(last_err)
8d85e291
MH
207 if not success_expected and success:
208 raise Exception("Unexpected connectivity detected")
a8375c94 209
1131a1c8 210def test_connectivity_iface(dev1, dev2, ifname, dscp=None, tos=None,
f0b6d1ed 211 max_tries=1, timeout=5):
1131a1c8 212 test_connectivity(dev1, dev2, dscp, tos, ifname2=ifname,
f0b6d1ed 213 max_tries=max_tries, timeout=timeout)
a8375c94 214
65249f6c 215def test_connectivity_p2p(dev1, dev2, dscp=None, tos=None):
ce9c8c40 216 test_connectivity(dev1, dev2, dscp, tos, dev1group=True, dev2group=True)
81266da7 217
65249f6c 218def test_connectivity_p2p_sta(dev1, dev2, dscp=None, tos=None):
ce9c8c40 219 test_connectivity(dev1, dev2, dscp, tos, dev1group=True, dev2group=False)
b162675f 220
65249f6c 221def test_connectivity_sta(dev1, dev2, dscp=None, tos=None):
ce9c8c40 222 test_connectivity(dev1, dev2, dscp, tos)
d0007ac9
JB
223
224(PS_DISABLED, PS_ENABLED, PS_AUTO_POLL, PS_MANUAL_POLL) = range(4)
225
226def set_powersave(dev, val):
227 phy = dev.get_driver_status_field("phyname")
f1100581
JA
228 fname = '/sys/kernel/debug/ieee80211/%s/hwsim/ps' % phy
229 data = '%d' % val
230 (res, data) = dev.cmd_execute(["echo", data, ">", fname], shell=True)
231 if res != 0:
232 raise Exception("Failed to set power save for device")
f5a270b5
JB
233
234def set_group_map(dev, val):
235 phy = dev.get_driver_status_field("phyname")
236 fname = '/sys/kernel/debug/ieee80211/%s/hwsim/group' % phy
237 data = '%d' % val
238 (res, data) = dev.cmd_execute(["echo", data, ">", fname], shell=True)
239 if res != 0:
240 raise Exception("Failed to set group map for %s" % phy)
080035d1
BL
241
242def set_rx_rssi(dev, val):
243 """
244 Configure signal strength when receiving transmitted frames.
245 mac80211_hwsim driver sets rssi to: TX power - 50
246 According to that set tx_power in order to get the desired RSSI.
247 Valid RSSI range: -50 to -30.
248 """
249 tx_power = (val + 50) * 100
250 ifname = dev.get_driver_status_field("ifname")
251 (res, data) = dev.cmd_execute([ 'iw', ifname, 'set', 'txpower',
252 'fixed', str(tx_power)] )
253 if res != 0:
254 raise Exception("Failed to set RSSI to %d" % val)
255
256def reset_rx_rssi(dev):
257 set_rx_rssi(dev, -30)