1# OCE tests
2# Copyright (c) 2016, Intel Deutschland GmbH
3#
4# This software may be distributed under the terms of the BSD license.
5# See README for more details.
6
7import logging
8logger = logging.getLogger()
9
10import hostapd
11from wpasupplicant import WpaSupplicant
12
13from hwsim_utils import set_rx_rssi, reset_rx_rssi
14import time
15import os
16from datetime import datetime
17from utils import HwsimSkip
18
19def check_set_tx_power(dev, apdev):
20    hapd = hostapd.add_ap(apdev[0], {'ssid': 'check_tx_power'})
21    set_rx_rssi(hapd, -50)
22
23    dev[0].scan(freq=2412)
24    ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 2)
25
26    res = dev[0].request("SCAN_RESULTS")
27    if '-50' not in res:
28        raise HwsimSkip('set_rx_rssi not supported')
29
30    reset_rx_rssi(hapd)
31
32    dev[0].scan(freq=2412)
33    ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 2)
34
35    res = dev[0].request("SCAN_RESULTS")
36    if '-30' not in res:
37        raise HwsimSkip('set_rx_rssi not supported')
38
39def run_rssi_based_assoc_rej_timeout(dev, apdev, params):
40    rssi_retry_to = 5
41
42    ap_params = {'ssid': "test-RSSI-ar-to",
43                 'rssi_reject_assoc_rssi': '-45',
44                 'rssi_reject_assoc_timeout': str(rssi_retry_to)}
45
46    logger.info("Set APs RSSI rejection threshold to -45 dBm, retry timeout: " +
47                str(rssi_retry_to))
48    hapd = hostapd.add_ap(apdev[0], ap_params)
49
50    logger.info("Set STAs TX RSSI to -50")
51    set_rx_rssi(dev[0], -50)
52
53    logger.info("STA is trying to connect")
54    dev[0].connect("test-RSSI-ar-to", key_mgmt="NONE", scan_freq="2412",
55                   wait_connect=False)
56
57    ev = dev[0].wait_event(['CTRL-EVENT-ASSOC-REJECT'], 2)
58    if ev is None:
59        raise Exception("Association not rejected")
60    if 'status_code=34' not in ev:
61        raise Exception("STA assoc request was not rejected with status code 34: " + ev)
62    t_rej = datetime.now()
63
64    # Set the scan interval to make dev[0] look for connections
65    if 'OK' not in dev[0].request("SCAN_INTERVAL 1"):
66        raise Exception("Failed to set scan interval")
67
68    logger.info("Validate that STA did not connect or sent assoc request within retry timeout")
69    ev = dev[0].wait_event(['CTRL-EVENT-CONNECTED', 'CTRL-EVENT-ASSOC-REJECT'],
70                           rssi_retry_to + 2)
71    t_ev = datetime.now()
72
73    if ((t_ev - t_rej).total_seconds() < rssi_retry_to):
74        raise Exception("STA sent assoc request within retry timeout")
75
76    if 'CTRL-EVENT-CONNECTED' in ev:
77        raise Exception("STA connected with low RSSI")
78
79    if not ev:
80        raise Exception("STA didn't send association request after retry timeout!")
81
82def test_rssi_based_assoc_rej_timeout(dev, apdev, params):
83    """RSSI-based association rejection: no assoc request during retry timeout"""
84    check_set_tx_power(dev, apdev)
85    try:
86        run_rssi_based_assoc_rej_timeout(dev, apdev, params)
87    finally:
88        reset_rx_rssi(dev[0])
89        dev[0].request("SCAN_INTERVAL 5")
90
91def run_rssi_based_assoc_rej_good_rssi(dev, apdev):
92    ap_params = {'ssid': "test-RSSI-ar-to",
93                 'rssi_reject_assoc_rssi': '-45',
94                 'rssi_reject_assoc_timeout': '60'}
95
96    logger.info("Set APs RSSI rejection threshold to -45 dBm")
97    hapd = hostapd.add_ap(apdev[0], ap_params)
98
99    logger.info("Set STAs TX RSSI to -45")
100    set_rx_rssi(dev[0], -45)
101
102    logger.info("STA is trying to connect")
103    dev[0].connect("test-RSSI-ar-to", key_mgmt="NONE", scan_freq="2412")
104
105def test_rssi_based_assoc_rej_good_rssi(dev, apdev):
106    """RSSI-based association rejection: STA with RSSI above the threshold connects"""
107    check_set_tx_power(dev, apdev)
108    try:
109        run_rssi_based_assoc_rej_good_rssi(dev, apdev)
110    finally:
111        reset_rx_rssi(dev[0])
112
113def run_rssi_based_assoc_rssi_change(dev, hapd):
114    logger.info("Set STAs and APs TX RSSI to -50")
115    set_rx_rssi(dev[0], -50)
116    set_rx_rssi(hapd, -50)
117
118    # Set the scan interval to make dev[0] look for connections
119    if 'OK' not in dev[0].request("SCAN_INTERVAL 1"):
120        raise Exception("Failed to set scan interval")
121
122    logger.info("STA is trying to connect")
123    dev[0].connect("test-RSSI-ar-to", key_mgmt="NONE", scan_freq="2412",
124                   wait_connect=False)
125
126    try:
127        dev[0].wait_completed(2)
128    except:
129        logger.info("STA didn't connect after 2 seconds.")
130    else:
131        raise Exception("STA connected with low RSSI")
132
133    logger.info("Set STAs and APs TX RSSI to -40dBm, validate that STA connects")
134    set_rx_rssi(dev[0], -40)
135    set_rx_rssi(hapd, -40)
136
137    dev[0].wait_completed(2)
138
139def test_rssi_based_assoc_rssi_change(dev, apdev):
140    """RSSI-based association rejection: connect after improving RSSI"""
141    check_set_tx_power(dev, apdev)
142    try:
143        ap_params = {'ssid': "test-RSSI-ar-to",
144                     'rssi_reject_assoc_rssi': '-45',
145                     'rssi_reject_assoc_timeout': '60'}
146
147        logger.info("Set APs RSSI rejection threshold to -45 dBm, retry timeout: 60")
148        hapd = hostapd.add_ap(apdev[0], ap_params)
149
150        run_rssi_based_assoc_rssi_change(dev, hapd)
151    finally:
152        reset_rx_rssi(dev[0])
153        reset_rx_rssi(hapd)
154        dev[0].request("SCAN_INTERVAL 5")
155
156def test_oce_ap(dev, apdev):
157    """OCE AP"""
158    ssid = "test-oce"
159    passphrase = 'qwertyuiop'
160    params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
161    params['ieee80211w'] = "1"
162    params['mbo'] = "1"
163    params['oce'] = "4"
164    hapd = hostapd.add_ap(apdev[0], params)
165    dev[0].connect(ssid, psk=passphrase, ieee80211w="1", scan_freq="2412")
166
167def test_oce_ap_open(dev, apdev):
168    """OCE AP (open)"""
169    ssid = "test-oce"
170    params = {"ssid": ssid}
171    params['mbo'] = "1"
172    params['oce'] = "4"
173    hapd = hostapd.add_ap(apdev[0], params)
174    dev[0].connect(ssid, key_mgmt="NONE", scan_freq="2412")
175
176def test_oce_ap_open_connect_cmd(dev, apdev):
177    """OCE AP (open, connect command)"""
178    ssid = "test-oce"
179    params = {"ssid": ssid}
180    params['mbo'] = "1"
181    params['oce'] = "4"
182    hapd = hostapd.add_ap(apdev[0], params)
183    wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
184    wpas.interface_add("wlan5", drv_params="force_connect_cmd=1")
185    wpas.connect(ssid, key_mgmt="NONE", scan_freq="2412")
186