]>
Commit | Line | Data |
---|---|---|
835a546b | 1 | # Python class for controlling wlantest |
fce6ddd8 | 2 | # Copyright (c) 2013-2014, Jouni Malinen <j@w1.fi> |
835a546b JM |
3 | # |
4 | # This software may be distributed under the terms of the BSD license. | |
5 | # See README for more details. | |
6 | ||
8efc83d4 | 7 | import re |
835a546b | 8 | import os |
8efc83d4 | 9 | import posixpath |
835a546b JM |
10 | import time |
11 | import subprocess | |
12 | import logging | |
13 | import wpaspy | |
14 | ||
c9aa4308 | 15 | logger = logging.getLogger() |
835a546b JM |
16 | |
17 | class Wlantest: | |
8efc83d4 JA |
18 | remote_host = None |
19 | setup_params = None | |
20 | exe_thread = None | |
21 | exe_res = [] | |
22 | monitor_mod = None | |
23 | setup_done = False | |
24 | ||
25 | @classmethod | |
26 | def stop_remote_wlantest(cls): | |
27 | if cls.exe_thread is None: | |
28 | # Local flow - no need for remote operations | |
29 | return | |
30 | ||
31 | cls.remote_host.execute(["killall", "-9", "wlantest"]) | |
32 | cls.remote_host.wait_execute_complete(cls.exe_thread, 5) | |
33 | cls.exe_thread = None | |
34 | cls.exe_res = [] | |
35 | ||
36 | @classmethod | |
37 | def reset_remote_wlantest(cls): | |
38 | cls.stop_remote_wlantest() | |
39 | cls.remote_host = None | |
40 | cls.setup_params = None | |
41 | cls.exe_thread = None | |
42 | cls.exe_res = [] | |
43 | cls.monitor_mod = None | |
44 | cls.setup_done = False | |
45 | ||
46 | @classmethod | |
47 | def start_remote_wlantest(cls): | |
48 | if cls.remote_host is None: | |
49 | # Local flow - no need for remote operations | |
50 | return | |
51 | if cls.exe_thread is not None: | |
52 | raise Exception("Cannot start wlantest twice") | |
53 | ||
54 | log_dir = cls.setup_params['log_dir'] | |
55 | ifaces = re.split('; | |, ', cls.remote_host.ifname) | |
56 | ifname = ifaces[0] | |
57 | exe = cls.setup_params["wlantest"] | |
58 | tc_name = cls.setup_params["tc_name"] | |
59 | base_log_name = tc_name + "_wlantest_" + \ | |
60 | cls.remote_host.name + "_" + ifname | |
61 | log_file = posixpath.join(log_dir, base_log_name + ".log") | |
62 | pcap_file = posixpath.join(log_dir, base_log_name + ".pcapng") | |
63 | cmd = "{} -i {} -n {} -c -dtN -L {}".format(exe, ifname, | |
64 | pcap_file, log_file) | |
65 | cls.remote_host.add_log(log_file) | |
66 | cls.remote_host.add_log(pcap_file) | |
67 | cls.exe_thread = cls.remote_host.execute_run(cmd.split(), cls.exe_res) | |
68 | # Give wlantest a chance to start working | |
69 | time.sleep(1) | |
70 | ||
71 | @classmethod | |
72 | def register_remote_wlantest(cls, host, setup_params, monitor_mod): | |
73 | if cls.remote_host is not None: | |
74 | raise Exception("Cannot register remote wlantest twice") | |
75 | cls.remote_host = host | |
76 | cls.setup_params = setup_params | |
77 | cls.monitor_mod = monitor_mod | |
78 | status, buf = host.execute(["which", setup_params['wlantest']]) | |
79 | if status != 0: | |
80 | raise Exception(host.name + " - wlantest: " + buf) | |
81 | status, buf = host.execute(["which", setup_params['wlantest_cli']]) | |
82 | if status != 0: | |
83 | raise Exception(host.name + " - wlantest_cli: " + buf) | |
84 | ||
85 | @classmethod | |
86 | def chan_from_wpa(cls, wpa, is_p2p=False): | |
87 | if cls.monitor_mod is None: | |
88 | return | |
89 | m = cls.monitor_mod | |
90 | return m.setup(cls.remote_host, [m.get_monitor_params(wpa, is_p2p)]) | |
91 | ||
92 | @classmethod | |
93 | def setup(cls, wpa, is_p2p=False): | |
94 | cls.chan_from_wpa(wpa, is_p2p) | |
95 | cls.start_remote_wlantest() | |
96 | cls.setup_done = True | |
97 | ||
280cd8a9 | 98 | def __init__(self): |
8efc83d4 JA |
99 | if not self.setup_done: |
100 | raise Exception("Cannot create Wlantest instance before setup()") | |
280cd8a9 JM |
101 | if os.path.isfile('../../wlantest/wlantest_cli'): |
102 | self.wlantest_cli = '../../wlantest/wlantest_cli' | |
103 | else: | |
104 | self.wlantest_cli = 'wlantest_cli' | |
105 | ||
8efc83d4 JA |
106 | def cli_cmd(self, params): |
107 | if self.remote_host is not None: | |
108 | exe = self.setup_params["wlantest_cli"] | |
109 | ret = self.remote_host.execute([exe] + params) | |
110 | if ret[0] != 0: | |
111 | raise Exception("wlantest_cli failed") | |
112 | return ret[1] | |
113 | else: | |
114 | return subprocess.check_output([self.wlantest_cli] + params) | |
115 | ||
835a546b | 116 | def flush(self): |
8efc83d4 | 117 | res = self.cli_cmd(["flush"]) |
835a546b JM |
118 | if "FAIL" in res: |
119 | raise Exception("wlantest_cli flush failed") | |
120 | ||
848bb8de | 121 | def relog(self): |
8efc83d4 | 122 | res = self.cli_cmd(["relog"]) |
848bb8de JM |
123 | if "FAIL" in res: |
124 | raise Exception("wlantest_cli relog failed") | |
125 | ||
835a546b | 126 | def add_passphrase(self, passphrase): |
8efc83d4 | 127 | res = self.cli_cmd(["add_passphrase", passphrase]) |
835a546b JM |
128 | if "FAIL" in res: |
129 | raise Exception("wlantest_cli add_passphrase failed") | |
130 | ||
131 | def add_wepkey(self, key): | |
8efc83d4 | 132 | res = self.cli_cmd(["add_wepkey", key]) |
835a546b JM |
133 | if "FAIL" in res: |
134 | raise Exception("wlantest_cli add_key failed") | |
135 | ||
136 | def info_bss(self, field, bssid): | |
8efc83d4 | 137 | res = self.cli_cmd(["info_bss", field, bssid]) |
835a546b JM |
138 | if "FAIL" in res: |
139 | raise Exception("Could not get BSS info from wlantest for " + bssid) | |
140 | return res | |
141 | ||
715bf904 | 142 | def get_bss_counter(self, field, bssid): |
fce6ddd8 | 143 | try: |
8efc83d4 | 144 | res = self.cli_cmd(["get_bss_counter", field, bssid]) |
bab493b9 | 145 | except Exception as e: |
fce6ddd8 | 146 | return 0 |
715bf904 | 147 | if "FAIL" in res: |
fce6ddd8 | 148 | return 0 |
715bf904 JM |
149 | return int(res) |
150 | ||
151 | def clear_bss_counters(self, bssid): | |
8efc83d4 | 152 | self.cli_cmd(["clear_bss_counters", bssid]) |
715bf904 | 153 | |
835a546b | 154 | def info_sta(self, field, bssid, addr): |
8efc83d4 | 155 | res = self.cli_cmd(["info_sta", field, bssid, addr]) |
835a546b JM |
156 | if "FAIL" in res: |
157 | raise Exception("Could not get STA info from wlantest for " + addr) | |
158 | return res | |
159 | ||
5b06bdf7 | 160 | def get_sta_counter(self, field, bssid, addr): |
8efc83d4 | 161 | res = self.cli_cmd(["get_sta_counter", field, bssid, addr]) |
5b06bdf7 JM |
162 | if "FAIL" in res: |
163 | raise Exception("wlantest_cli command failed") | |
164 | return int(res) | |
165 | ||
65249f6c | 166 | def clear_sta_counters(self, bssid, addr): |
8efc83d4 | 167 | res = self.cli_cmd(["clear_sta_counters", bssid, addr]) |
65249f6c JM |
168 | if "FAIL" in res: |
169 | raise Exception("wlantest_cli command failed") | |
170 | ||
835a546b | 171 | def tdls_clear(self, bssid, addr1, addr2): |
8efc83d4 | 172 | self.cli_cmd(["clear_tdls_counters", bssid, addr1, addr2]) |
835a546b JM |
173 | |
174 | def get_tdls_counter(self, field, bssid, addr1, addr2): | |
8efc83d4 | 175 | res = self.cli_cmd(["get_tdls_counter", field, bssid, addr1, addr2]) |
835a546b JM |
176 | if "FAIL" in res: |
177 | raise Exception("wlantest_cli command failed") | |
178 | return int(res) | |
179 | ||
180 | def require_ap_pmf_mandatory(self, bssid): | |
181 | res = self.info_bss("rsn_capab", bssid) | |
182 | if "MFPR" not in res: | |
183 | raise Exception("AP did not require PMF") | |
184 | if "MFPC" not in res: | |
185 | raise Exception("AP did not enable PMF") | |
186 | res = self.info_bss("key_mgmt", bssid) | |
187 | if "PSK-SHA256" not in res: | |
188 | raise Exception("AP did not enable SHA256-based AKM for PMF") | |
189 | ||
190 | def require_ap_pmf_optional(self, bssid): | |
191 | res = self.info_bss("rsn_capab", bssid) | |
192 | if "MFPR" in res: | |
193 | raise Exception("AP required PMF") | |
194 | if "MFPC" not in res: | |
195 | raise Exception("AP did not enable PMF") | |
196 | ||
197 | def require_ap_no_pmf(self, bssid): | |
198 | res = self.info_bss("rsn_capab", bssid) | |
199 | if "MFPR" in res: | |
200 | raise Exception("AP required PMF") | |
201 | if "MFPC" in res: | |
202 | raise Exception("AP enabled PMF") | |
203 | ||
204 | def require_sta_pmf_mandatory(self, bssid, addr): | |
205 | res = self.info_sta("rsn_capab", bssid, addr) | |
206 | if "MFPR" not in res: | |
207 | raise Exception("STA did not require PMF") | |
208 | if "MFPC" not in res: | |
209 | raise Exception("STA did not enable PMF") | |
210 | ||
211 | def require_sta_pmf(self, bssid, addr): | |
212 | res = self.info_sta("rsn_capab", bssid, addr) | |
213 | if "MFPC" not in res: | |
214 | raise Exception("STA did not enable PMF") | |
215 | ||
04bb8452 JM |
216 | def require_sta_no_pmf(self, bssid, addr): |
217 | res = self.info_sta("rsn_capab", bssid, addr) | |
218 | if "MFPC" in res: | |
219 | raise Exception("STA enabled PMF") | |
220 | ||
835a546b JM |
221 | def require_sta_key_mgmt(self, bssid, addr, key_mgmt): |
222 | res = self.info_sta("key_mgmt", bssid, addr) | |
223 | if key_mgmt not in res: | |
224 | raise Exception("Unexpected STA key_mgmt") | |
65249f6c JM |
225 | |
226 | def get_tx_tid(self, bssid, addr, tid): | |
8efc83d4 | 227 | res = self.cli_cmd(["get_tx_tid", bssid, addr, str(tid)]) |
65249f6c JM |
228 | if "FAIL" in res: |
229 | raise Exception("wlantest_cli command failed") | |
230 | return int(res) | |
231 | ||
232 | def get_rx_tid(self, bssid, addr, tid): | |
8efc83d4 | 233 | res = self.cli_cmd(["get_rx_tid", bssid, addr, str(tid)]) |
65249f6c JM |
234 | if "FAIL" in res: |
235 | raise Exception("wlantest_cli command failed") | |
236 | return int(res) | |
237 | ||
238 | def get_tid_counters(self, bssid, addr): | |
239 | tx = {} | |
240 | rx = {} | |
241 | for tid in range(0, 17): | |
242 | tx[tid] = self.get_tx_tid(bssid, addr, tid) | |
243 | rx[tid] = self.get_rx_tid(bssid, addr, tid) | |
244 | return [ tx, rx ] |