1# FST tests related definitions
2# Copyright (c) 2015, Qualcomm Atheros, Inc.
3#
4# This software may be distributed under the terms of the BSD license.
5# See README for more details.
6
7import subprocess
8import time
9import logging
10
11import hostapd
12
13logger = logging.getLogger()
14
15fst_test_def_group = 'fstg0'
16fst_test_def_freq_g = '2412' # Channel 1
17fst_test_def_freq_a = '5180' # Channel 36
18fst_test_def_chan_g = '1'
19fst_test_def_chan_a = '36'
20fst_test_def_prio_low = '100'
21fst_test_def_prio_high = '110'
22fst_test_def_llt = '100'
23fst_test_def_reg_domain = '00'
24
25class HapdRegCtrl:
26    def __init__(self):
27        self.ifname = None
28        self.changed = False
29
30    def start(self):
31        if self.ifname != None:
32             hapd = hostapd.Hostapd(self.ifname)
33             self.changed = self.wait_hapd_reg_change(hapd)
34
35    def stop(self):
36        if self.changed == True:
37            self.restore_reg_domain()
38            self.changed = False
39
40    def add_ap(self, ifname, chan):
41        if self.changed == False and self.channel_may_require_reg_change(chan):
42             self.ifname = ifname
43
44    @staticmethod
45    def channel_may_require_reg_change(chan):
46        if int(chan) > 14:
47            return True
48        return False
49
50    @staticmethod
51    def wait_hapd_reg_change(hapd):
52        state = hapd.get_status_field("state")
53        if state != "COUNTRY_UPDATE":
54            state = hapd.get_status_field("state")
55            if state != "ENABLED":
56                raise Exception("Unexpected interface state - expected COUNTRY_UPDATE")
57            else:
58                logger.debug("fst hostapd: regulatory domain already set")
59                return True
60
61        logger.debug("fst hostapd: waiting for regulatory domain to be set...")
62
63        ev = hapd.wait_event(["AP-ENABLED"], timeout=10)
64        if not ev:
65            raise Exception("AP setup timed out")
66
67        logger.debug("fst hostapd: regulatory domain set")
68
69        state = hapd.get_status_field("state")
70        if state != "ENABLED":
71            raise Exception("Unexpected interface state - expected ENABLED")
72
73        logger.debug("fst hostapd: regulatory domain ready")
74        return True
75
76    @staticmethod
77    def restore_reg_domain():
78        logger.debug("fst hostapd: waiting for regulatory domain to be restored...")
79
80        res = subprocess.call(['iw', 'reg', 'set', fst_test_def_reg_domain])
81        if res != 0:
82            raise Exception("Cannot restore regulatory domain")
83
84        logger.debug("fst hostapd: regulatory domain ready")
85
86def fst_clear_regdom():
87    cmd = subprocess.Popen(["iw", "reg", "get"], stdout=subprocess.PIPE)
88    res = cmd.stdout.read().decode()
89    cmd.stdout.close()
90    if "country 00:" not in res:
91        subprocess.call(['iw', 'reg', 'set', '00'])
92        time.sleep(0.1)
93