]>
Commit | Line | Data |
---|---|---|
41a256ec AN |
1 | # FST tests related definitions |
2 | # Copyright (c) 2015, Qualcomm Atheros, Inc. | |
3 | # | |
4 | # This software may be distributed under the terms of the BSD license. | |
5 | # See README for more details. | |
6 | ||
7 | import subprocess | |
003716ec | 8 | import time |
41a256ec AN |
9 | import logging |
10 | ||
11 | import hostapd | |
12 | ||
13 | logger = logging.getLogger() | |
14 | ||
fab49f61 JM |
15 | fst_test_def_group = 'fstg0' |
16 | fst_test_def_freq_g = '2412' # Channel 1 | |
17 | fst_test_def_freq_a = '5180' # Channel 36 | |
18 | fst_test_def_chan_g = '1' | |
19 | fst_test_def_chan_a = '36' | |
20 | fst_test_def_prio_low = '100' | |
21 | fst_test_def_prio_high = '110' | |
22 | fst_test_def_llt = '100' | |
23 | fst_test_def_reg_domain = '00' | |
41a256ec AN |
24 | |
25 | class HapdRegCtrl: | |
26 | def __init__(self): | |
27 | self.refcnt = 0 | |
28 | self.ifname = None | |
29 | self.changed = False | |
30 | ||
31 | def __del__(self): | |
32 | if self.refcnt != 0 and self.changed == True: | |
33 | self.restore_reg_domain() | |
34 | ||
35 | def start(self): | |
36 | if self.ifname != None: | |
37 | hapd = hostapd.Hostapd(self.ifname) | |
38 | self.changed = self.wait_hapd_reg_change(hapd) | |
39 | ||
40 | def stop(self): | |
41 | if self.changed == True: | |
42 | self.restore_reg_domain() | |
43 | self.changed = False | |
44 | ||
45 | def add_ap(self, ifname, chan): | |
46 | if self.changed == False and self.channel_may_require_reg_change(chan): | |
47 | self.ifname = ifname | |
48 | ||
49 | @staticmethod | |
50 | def channel_may_require_reg_change(chan): | |
51 | if int(chan) > 14: | |
52 | return True | |
53 | return False | |
54 | ||
55 | @staticmethod | |
56 | def wait_hapd_reg_change(hapd): | |
57 | state = hapd.get_status_field("state") | |
58 | if state != "COUNTRY_UPDATE": | |
59 | state = hapd.get_status_field("state") | |
60 | if state != "ENABLED": | |
61 | raise Exception("Unexpected interface state - expected COUNTRY_UPDATE") | |
62 | else: | |
63 | logger.debug("fst hostapd: regulatory domain already set") | |
64 | return True | |
65 | ||
66 | logger.debug("fst hostapd: waiting for regulatory domain to be set...") | |
67 | ||
68 | ev = hapd.wait_event(["AP-ENABLED"], timeout=10) | |
69 | if not ev: | |
70 | raise Exception("AP setup timed out") | |
71 | ||
72 | logger.debug("fst hostapd: regulatory domain set") | |
73 | ||
74 | state = hapd.get_status_field("state") | |
75 | if state != "ENABLED": | |
76 | raise Exception("Unexpected interface state - expected ENABLED") | |
77 | ||
78 | logger.debug("fst hostapd: regulatory domain ready") | |
79 | return True | |
80 | ||
81 | @staticmethod | |
82 | def restore_reg_domain(): | |
83 | logger.debug("fst hostapd: waiting for regulatory domain to be restored...") | |
84 | ||
85 | res = subprocess.call(['iw', 'reg', 'set', fst_test_def_reg_domain]) | |
86 | if res != 0: | |
87 | raise Exception("Cannot restore regulatory domain") | |
88 | ||
89 | logger.debug("fst hostapd: regulatory domain ready") | |
003716ec JM |
90 | |
91 | def fst_clear_regdom(): | |
fab49f61 | 92 | cmd = subprocess.Popen(["iw", "reg", "get"], stdout=subprocess.PIPE) |
1c48c9bc | 93 | res = cmd.stdout.read().decode() |
003716ec JM |
94 | cmd.stdout.close() |
95 | if "country 00:" not in res: | |
96 | subprocess.call(['iw', 'reg', 'set', '00']) | |
97 | time.sleep(0.1) |