1# HE tests
2# Copyright (c) 2019, The Linux Foundation
3#
4# This software may be distributed under the terms of the BSD license.
5# See README for more details.
6
7import logging
8logger = logging.getLogger()
9import os
10import subprocess, time
11
12import hwsim_utils
13import hostapd
14from wpasupplicant import WpaSupplicant
15from utils import *
16from test_dfs import wait_dfs_event
17from test_ap_acs import wait_acs
18
19def test_he_open(dev, apdev):
20    """HE AP with open mode configuration"""
21    params = {"ssid": "he",
22              "ieee80211ax": "1",
23              "he_bss_color": "42",
24              "he_mu_edca_ac_be_ecwmin": "7",
25              "he_mu_edca_ac_be_ecwmax": "15"}
26    hapd = hostapd.add_ap(apdev[0], params)
27    if hapd.get_status_field("ieee80211ax") != "1":
28        raise Exception("STATUS did not indicate ieee80211ax=1")
29    dev[0].connect("he", key_mgmt="NONE", scan_freq="2412")
30    sta = hapd.get_sta(dev[0].own_addr())
31    if "[HE]" not in sta['flags']:
32        raise Exception("Missing STA flag: HE")
33
34def test_he_disabled_on_sta(dev, apdev):
35    """HE AP and HE disabled on STA"""
36    params = {"ssid": "he",
37              "ieee80211ax": "1",
38              "he_bss_color": "42",
39              "he_mu_edca_ac_be_ecwmin": "7",
40              "he_mu_edca_ac_be_ecwmax": "15"}
41    hapd = hostapd.add_ap(apdev[0], params)
42    dev[0].connect("he", key_mgmt="NONE", scan_freq="2412", disable_he="1")
43    sta = hapd.get_sta(dev[0].own_addr())
44    if "[HE]" in sta['flags']:
45        raise Exception("Unexpected STA flag: HE")
46
47def test_he_params(dev, apdev):
48    """HE AP parameters"""
49    params = {"ssid": "he",
50              "ieee80211ax": "1",
51              "he_bss_color": "42",
52              "he_mu_edca_ac_be_ecwmin": "7",
53              "he_mu_edca_ac_be_ecwmax": "15",
54              "he_su_beamformer": "0",
55              "he_su_beamformee": "0",
56              "he_default_pe_duration": "4",
57              "he_twt_required": "1",
58              "he_rts_threshold": "64",
59              "he_basic_mcs_nss_set": "65535",
60              "he_mu_edca_qos_info_param_count": "0",
61              "he_mu_edca_qos_info_q_ack": "0",
62              "he_mu_edca_qos_info_queue_request": "1",
63              "he_mu_edca_qos_info_txop_request": "0",
64              "he_mu_edca_ac_be_aifsn": "0",
65              "he_mu_edca_ac_be_ecwmin": "15",
66              "he_mu_edca_ac_be_ecwmax": "15",
67              "he_mu_edca_ac_be_timer": "255",
68              "he_mu_edca_ac_bk_aifsn": "0",
69              "he_mu_edca_ac_bk_aci": "1",
70              "he_mu_edca_ac_bk_ecwmin": "15",
71              "he_mu_edca_ac_bk_ecwmax": "15",
72              "he_mu_edca_ac_bk_timer": "255",
73              "he_mu_edca_ac_vi_ecwmin": "15",
74              "he_mu_edca_ac_vi_ecwmax": "15",
75              "he_mu_edca_ac_vi_aifsn": "0",
76              "he_mu_edca_ac_vi_aci": "2",
77              "he_mu_edca_ac_vi_timer": "255",
78              "he_mu_edca_ac_vo_aifsn": "0",
79              "he_mu_edca_ac_vo_aci": "3",
80              "he_mu_edca_ac_vo_ecwmin": "15",
81              "he_mu_edca_ac_vo_ecwmax": "15",
82              "he_mu_edca_ac_vo_timer": "255",
83              "he_spr_sr_control": "0",
84              "he_spr_non_srg_obss_pd_max_offset": "0",
85              "he_spr_srg_obss_pd_min_offset": "0",
86              "he_spr_srg_obss_pd_max_offset": "0",
87              "he_spr_srg_bss_colors": "1 2 10 63",
88              "he_spr_srg_partial_bssid": "0 1 3 63",
89              "he_6ghz_max_ampdu_len_exp": "7",
90              "he_6ghz_rx_ant_pat": "1",
91              "he_6ghz_tx_ant_pat": "1",
92              "he_6ghz_max_mpdu": "2",
93              "he_oper_chwidth": "0",
94              "he_oper_centr_freq_seg0_idx": "1",
95              "he_oper_centr_freq_seg1_idx": "0"}
96    hapd = hostapd.add_ap(apdev[0], params)
97    if hapd.get_status_field("ieee80211ax") != "1":
98        raise Exception("STATUS did not indicate ieee80211ax=1")
99    dev[0].connect("he", key_mgmt="NONE", scan_freq="2412")
100
101def test_he_spr_params(dev, apdev):
102    """HE AP spatial reuse parameters"""
103    params = {"ssid": "he",
104              "ieee80211ax": "1",
105              "he_spr_sr_control": "12",
106              "he_spr_non_srg_obss_pd_max_offset": "1",
107              "he_spr_srg_obss_pd_min_offset": "2",
108              "he_spr_srg_obss_pd_max_offset": "3",
109              "he_spr_srg_bss_colors": "1 2 10 63",
110              "he_spr_srg_partial_bssid": "0 1 3 63",
111              "he_oper_chwidth": "0",
112              "he_oper_centr_freq_seg0_idx": "1",
113              "he_oper_centr_freq_seg1_idx": "0"}
114    hapd = hostapd.add_ap(apdev[0], params)
115    if hapd.get_status_field("ieee80211ax") != "1":
116        raise Exception("STATUS did not indicate ieee80211ax=1")
117    dev[0].connect("he", key_mgmt="NONE", scan_freq="2412")
118
119def he_supported():
120    cmd = subprocess.Popen(["iw", "reg", "get"], stdout=subprocess.PIPE)
121    reg = cmd.stdout.read().decode()
122    if "@ 80)" in reg or "@ 160)" in reg:
123        return True
124    return False
125
126def test_he80(dev, apdev):
127    """HE with 80 MHz channel width"""
128    try:
129        hapd = None
130        params = {"ssid": "he",
131                  "country_code": "FI",
132                  "hw_mode": "a",
133                  "channel": "36",
134                  "ht_capab": "[HT40+]",
135                  "ieee80211n": "1",
136                  "ieee80211ac": "1",
137                  "ieee80211ax": "1",
138                  "vht_oper_chwidth": "1",
139                  "vht_capab": "[MAX-MPDU-11454]",
140                  "vht_oper_centr_freq_seg0_idx": "42",
141                  "he_oper_chwidth": "1",
142                  "he_oper_centr_freq_seg0_idx": "42"}
143        hapd = hostapd.add_ap(apdev[0], params)
144        bssid = apdev[0]['bssid']
145
146        dev[0].connect("he", key_mgmt="NONE", scan_freq="5180")
147        hwsim_utils.test_connectivity(dev[0], hapd)
148        sig = dev[0].request("SIGNAL_POLL").splitlines()
149        if "FREQUENCY=5180" not in sig:
150            raise Exception("Unexpected SIGNAL_POLL value(1): " + str(sig))
151        if "WIDTH=80 MHz" not in sig:
152            raise Exception("Unexpected SIGNAL_POLL value(2): " + str(sig))
153        est = dev[0].get_bss(bssid)['est_throughput']
154        if est != "600502":
155            raise Exception("Unexpected BSS est_throughput: " + est)
156        status = dev[0].get_status()
157        if status["ieee80211ac"] != "1":
158            raise Exception("Unexpected STATUS ieee80211ac value (STA)")
159        status = hapd.get_status()
160        logger.info("hostapd STATUS: " + str(status))
161        if status["ieee80211n"] != "1":
162            raise Exception("Unexpected STATUS ieee80211n value")
163        if status["ieee80211ac"] != "1":
164            raise Exception("Unexpected STATUS ieee80211ac value")
165        if status["ieee80211ax"] != "1":
166            raise Exception("Unexpected STATUS ieee80211ax value")
167        if status["secondary_channel"] != "1":
168            raise Exception("Unexpected STATUS secondary_channel value")
169        if status["vht_oper_chwidth"] != "1":
170            raise Exception("Unexpected STATUS vht_oper_chwidth value")
171        if status["vht_oper_centr_freq_seg0_idx"] != "42":
172            raise Exception("Unexpected STATUS vht_oper_centr_freq_seg0_idx value")
173        if "vht_caps_info" not in status:
174            raise Exception("Missing vht_caps_info")
175        if status["he_oper_chwidth"] != "1":
176            raise Exception("Unexpected STATUS he_oper_chwidth value")
177        if status["he_oper_centr_freq_seg0_idx"] != "42":
178            raise Exception("Unexpected STATUS he_oper_centr_freq_seg0_idx value")
179
180        sta = hapd.get_sta(dev[0].own_addr())
181        logger.info("hostapd STA: " + str(sta))
182        if "[HT]" not in sta['flags']:
183            raise Exception("Missing STA flag: HT")
184        if "[VHT]" not in sta['flags']:
185            raise Exception("Missing STA flag: VHT")
186        if "[HE]" not in sta['flags']:
187            raise Exception("Missing STA flag: HE")
188
189    except Exception as e:
190        if isinstance(e, Exception) and str(e) == "AP startup failed":
191            if not he_supported():
192                raise HwsimSkip("80 MHz channel not supported in regulatory information")
193        raise
194    finally:
195        dev[0].request("DISCONNECT")
196        clear_regdom(hapd, dev)
197
198def _test_he_wifi_generation(dev, apdev, conf, scan_freq):
199    try:
200        hapd = None
201        params = {"ssid": "he",
202                  "country_code": "FI",
203                  "ieee80211n": "1",
204                  "ieee80211ax": "1"}
205        params.update(conf)
206        hapd = hostapd.add_ap(apdev[0], params)
207        bssid = apdev[0]['bssid']
208
209        dev[0].connect("he", key_mgmt="NONE", scan_freq=scan_freq)
210        status = dev[0].get_status()
211        if 'wifi_generation' not in status:
212            # For now, assume this is because of missing kernel support
213            raise HwsimSkip("Association Request IE reporting not supported")
214            #raise Exception("Missing wifi_generation information")
215        if status['wifi_generation'] != "6":
216            raise Exception("Unexpected wifi_generation value: " + status['wifi_generation'])
217
218        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
219        wpas.interface_add("wlan5", drv_params="force_connect_cmd=1")
220        wpas.connect("he", key_mgmt="NONE", scan_freq=scan_freq)
221        status = wpas.get_status()
222        if 'wifi_generation' not in status:
223            # For now, assume this is because of missing kernel support
224            raise HwsimSkip("Association Request IE reporting not supported")
225            #raise Exception("Missing wifi_generation information (connect)")
226        if status['wifi_generation'] != "6":
227            raise Exception("Unexpected wifi_generation value (connect): " + status['wifi_generation'])
228    except Exception as e:
229        if isinstance(e, Exception) and str(e) == "AP startup failed":
230            if not he_supported():
231                raise HwsimSkip("80 MHz channel not supported in regulatory information")
232        raise
233    finally:
234        dev[0].request("DISCONNECT")
235        clear_regdom(hapd, dev)
236
237def test_he_wifi_generation(dev, apdev):
238    """HE and wifi_generation (5 GHz)"""
239    conf = {
240        "vht_oper_chwidth": "1",
241        "hw_mode": "a",
242        "channel": "36",
243        "ht_capab": "[HT40+]",
244        "vht_oper_centr_freq_seg0_idx": "42",
245        "he_oper_chwidth": "1",
246        "he_oper_centr_freq_seg0_idx": "42",
247        "vht_capab": "[MAX-MPDU-11454]",
248        "ieee80211ac": "1",
249    }
250    _test_he_wifi_generation(dev, apdev, conf, "5180")
251
252def test_he_wifi_generation_24(dev, apdev):
253    """HE and wifi_generation (2.4 GHz)"""
254    conf = {
255        "hw_mode": "g",
256        "channel": "1",
257    }
258    _test_he_wifi_generation(dev, apdev, conf, "2412")
259
260def he80_test(apdev, dev, channel, ht_capab):
261    clear_scan_cache(apdev)
262    try:
263        hapd = None
264        params = {"ssid": "he",
265                  "country_code": "FI",
266                  "hw_mode": "a",
267                  "channel": str(channel),
268                  "ht_capab": ht_capab,
269                  "ieee80211n": "1",
270                  "ieee80211ac": "1",
271                  "ieee80211ax": "1",
272                  "vht_oper_chwidth": "1",
273                  "vht_oper_centr_freq_seg0_idx": "42",
274                  "he_oper_chwidth": "1",
275                  "he_oper_centr_freq_seg0_idx": "42"}
276        hapd = hostapd.add_ap(apdev, params)
277        bssid = apdev['bssid']
278
279        dev[0].connect("he", key_mgmt="NONE",
280                       scan_freq=str(5000 + 5 * channel))
281        hwsim_utils.test_connectivity(dev[0], hapd)
282    except Exception as e:
283        if isinstance(e, Exception) and str(e) == "AP startup failed":
284            if not he_supported():
285                raise HwsimSkip("80 MHz channel not supported in regulatory information")
286        raise
287    finally:
288        clear_regdom(hapd, dev)
289
290def test_he80b(dev, apdev):
291    """HE with 80 MHz channel width (HT40- channel 40)"""
292    he80_test(apdev[0], dev, 40, "[HT40-]")
293
294def test_he80c(dev, apdev):
295    """HE with 80 MHz channel width (HT40+ channel 44)"""
296    he80_test(apdev[0], dev, 44, "[HT40+]")
297
298def test_he80d(dev, apdev):
299    """HE with 80 MHz channel width (HT40- channel 48)"""
300    he80_test(apdev[0], dev, 48, "[HT40-]")
301
302def test_he80_params(dev, apdev):
303    """HE with 80 MHz channel width and number of optional features enabled"""
304    try:
305        hapd = None
306        params = {"ssid": "he",
307                  "country_code": "FI",
308                  "hw_mode": "a",
309                  "channel": "36",
310                  "ht_capab": "[HT40+][SHORT-GI-40][DSS_CCK-40]",
311                  "ieee80211n": "1",
312                  "ieee80211ac": "1",
313                  "ieee80211ax": "1",
314                  "vht_oper_chwidth": "1",
315                  "vht_capab": "[MAX-MPDU-11454][RXLDPC][SHORT-GI-80][TX-STBC-2BY1][RX-STBC-1][MAX-A-MPDU-LEN-EXP0]",
316                  "vht_oper_centr_freq_seg0_idx": "42",
317                  "require_vht": "1",
318                  "require_he": "1",
319                  "he_oper_chwidth": "1",
320                  "he_oper_centr_freq_seg0_idx": "42",
321                  "he_su_beamformer": "1",
322                  "he_mu_beamformer": "1",
323                  "he_bss_color":"1",
324                  "he_default_pe_duration":"1",
325                  "he_twt_required":"1",
326                  "he_rts_threshold":"1"}
327        hapd = hostapd.add_ap(apdev[0], params)
328
329        dev[1].connect("he", key_mgmt="NONE", scan_freq="5180",
330                       disable_vht="1", wait_connect=False)
331        dev[0].connect("he", key_mgmt="NONE", scan_freq="5180")
332        dev[2].connect("he", key_mgmt="NONE", scan_freq="5180",
333                       disable_sgi="1")
334        ev = dev[1].wait_event(["CTRL-EVENT-ASSOC-REJECT"])
335        if ev is None:
336            raise Exception("Association rejection timed out")
337        if "status_code=104" not in ev:
338            raise Exception("Unexpected rejection status code")
339        dev[1].request("DISCONNECT")
340        dev[1].request("REMOVE_NETWORK all")
341        dev[1].dump_monitor()
342        dev[1].connect("he", key_mgmt="NONE", scan_freq="5180",
343                       disable_he="1", wait_connect=False)
344        hwsim_utils.test_connectivity(dev[0], hapd)
345        sta0 = hapd.get_sta(dev[0].own_addr())
346        sta2 = hapd.get_sta(dev[2].own_addr())
347        capab0 = int(sta0['vht_caps_info'], base=16)
348        capab2 = int(sta2['vht_caps_info'], base=16)
349        if capab0 & 0x60 == 0:
350            raise Exception("dev[0] did not support SGI")
351        if capab2 & 0x60 != 0:
352            raise Exception("dev[2] claimed support for SGI")
353        ev = dev[1].wait_event(["CTRL-EVENT-ASSOC-REJECT"])
354        if ev is None:
355            raise Exception("Association rejection timed out (2)")
356        if "status_code=124" not in ev:
357            raise Exception("Unexpected rejection status code (2): " + ev)
358    except Exception as e:
359        if isinstance(e, Exception) and str(e) == "AP startup failed":
360            if not he_supported():
361                raise HwsimSkip("80 MHz channel not supported in regulatory information")
362        raise
363    finally:
364        clear_regdom(hapd, dev, count=3)
365
366def test_he80_invalid(dev, apdev):
367    """HE with invalid 80 MHz channel configuration (seg1)"""
368    try:
369        hapd = None
370        params = {"ssid": "he",
371                  "country_code": "US",
372                  "hw_mode": "a",
373                  "channel": "36",
374                  "ht_capab": "[HT40+]",
375                  "ieee80211n": "1",
376                  "ieee80211ac": "1",
377                  "ieee80211ax": "1",
378                  "vht_oper_chwidth": "1",
379                  "vht_oper_centr_freq_seg0_idx": "42",
380                  "vht_oper_centr_freq_seg1_idx": "159",
381                  "he_oper_chwidth": "1",
382                  "he_oper_centr_freq_seg0_idx": "42",
383                  "he_oper_centr_freq_seg1_idx": "155",
384                  'ieee80211d': '1',
385                  'ieee80211h': '1'}
386        hapd = hostapd.add_ap(apdev[0], params, wait_enabled=False)
387        # This fails due to unexpected seg1 configuration
388        ev = hapd.wait_event(["AP-DISABLED"], timeout=5)
389        if ev is None:
390            raise Exception("AP-DISABLED not reported")
391    except Exception as e:
392        if isinstance(e, Exception) and str(e) == "AP startup failed":
393            if not he_supported():
394                raise HwsimSkip("80/160 MHz channel not supported in regulatory information")
395        raise
396    finally:
397        clear_regdom(hapd, dev)
398
399def test_he80_invalid2(dev, apdev):
400    """HE with invalid 80 MHz channel configuration (seg0)"""
401    try:
402        hapd = None
403        params = {"ssid": "he",
404                  "country_code": "US",
405                  "hw_mode": "a",
406                  "channel": "36",
407                  "ht_capab": "[HT40+]",
408                  "ieee80211n": "1",
409                  "ieee80211ac": "1",
410                  "ieee80211ax": "1",
411                  "vht_oper_chwidth": "1",
412                  "vht_oper_centr_freq_seg0_idx": "42",
413                  "he_oper_chwidth": "1",
414                  "he_oper_centr_freq_seg0_idx": "46",
415                  'ieee80211d': '1',
416                  'ieee80211h': '1'}
417        hapd = hostapd.add_ap(apdev[0], params, wait_enabled=False)
418        # This fails due to invalid seg0 configuration
419        ev = hapd.wait_event(["AP-DISABLED"], timeout=5)
420        if ev is None:
421            raise Exception("AP-DISABLED not reported")
422    except Exception as e:
423        if isinstance(e, Exception) and str(e) == "AP startup failed":
424            if not he_supported():
425                raise HwsimSkip("80/160 MHz channel not supported in regulatory information")
426        raise
427    finally:
428        clear_regdom(hapd, dev)
429
430def test_he_20(devs, apdevs):
431    """HE and 20 MHz channel"""
432    dev = devs[0]
433    ap = apdevs[0]
434    try:
435        hapd = None
436        params = {"ssid": "test-he20",
437                  "country_code": "DE",
438                  "hw_mode": "a",
439                  "channel": "36",
440                  "ieee80211n": "1",
441                  "ieee80211ac": "1",
442                  "ieee80211ax": "1",
443                  "ht_capab": "",
444                  "vht_capab": "",
445                  "vht_oper_chwidth": "0",
446                  "vht_oper_centr_freq_seg0_idx": "0",
447                  "supported_rates": "60 120 240 360 480 540",
448                  "require_vht": "1",
449                  "he_oper_chwidth": "0",
450                  "he_oper_centr_freq_seg0_idx": "0"}
451        hapd = hostapd.add_ap(ap, params)
452        dev.connect("test-he20", scan_freq="5180", key_mgmt="NONE")
453        hwsim_utils.test_connectivity(dev, hapd)
454    finally:
455        dev.request("DISCONNECT")
456        clear_regdom(hapd, devs)
457
458def test_he_40(devs, apdevs):
459    """HE and 40 MHz channel"""
460    dev = devs[0]
461    ap = apdevs[0]
462    try:
463        hapd = None
464        params = {"ssid": "test-he40",
465                  "country_code": "DE",
466                  "hw_mode": "a",
467                  "channel": "36",
468                  "ieee80211n": "1",
469                  "ieee80211ac": "1",
470                  "ieee80211ax": "1",
471                  "ht_capab": "[HT40+]",
472                  "vht_capab": "",
473                  "vht_oper_chwidth": "0",
474                  "vht_oper_centr_freq_seg0_idx": "38",
475                  "he_oper_chwidth": "0",
476                  "he_oper_centr_freq_seg0_idx": "38",
477                  "he_su_beamformer": "1",
478                  "he_mu_beamformer": "1"}
479        hapd = hostapd.add_ap(ap, params)
480        dev.connect("test-he40", scan_freq="5180", key_mgmt="NONE")
481        hwsim_utils.test_connectivity(dev, hapd)
482    finally:
483        dev.request("DISCONNECT")
484        clear_regdom(hapd, devs)
485
486@long_duration_test
487def test_he160(dev, apdev):
488    """HE with 160 MHz channel width (1)"""
489    try:
490        hapd = None
491        params = {"ssid": "he",
492                  "country_code": "FI",
493                  "hw_mode": "a",
494                  "channel": "36",
495                  "ht_capab": "[HT40+]",
496                  "vht_capab": "[VHT160]",
497                  "ieee80211n": "1",
498                  "ieee80211ac": "1",
499                  "ieee80211ax": "1",
500                  "vht_oper_chwidth": "2",
501                  "vht_oper_centr_freq_seg0_idx": "50",
502                  "he_oper_chwidth": "2",
503                  "he_oper_centr_freq_seg0_idx": "50",
504                  'ieee80211d': '1',
505                  'ieee80211h': '1'}
506        hapd = hostapd.add_ap(apdev[0], params, wait_enabled=False)
507        bssid = apdev[0]['bssid']
508
509        ev = wait_dfs_event(hapd, "DFS-CAC-START", 5)
510        if "DFS-CAC-START" not in ev:
511            raise Exception("Unexpected DFS event")
512
513        state = hapd.get_status_field("state")
514        if state != "DFS":
515            if state == "DISABLED" and not os.path.exists("dfs"):
516                # Not all systems have recent enough CRDA version and
517                # wireless-regdb changes to support 160 MHz and DFS. For now,
518                # do not report failures for this test case.
519                raise HwsimSkip("CRDA or wireless-regdb did not support 160 MHz")
520            raise Exception("Unexpected interface state: " + state)
521
522        logger.info("Waiting for CAC to complete")
523
524        ev = wait_dfs_event(hapd, "DFS-CAC-COMPLETED", 70)
525        if "success=1" not in ev:
526            raise Exception("CAC failed")
527        if "freq=5180" not in ev:
528            raise Exception("Unexpected DFS freq result")
529
530        ev = hapd.wait_event(["AP-ENABLED"], timeout=5)
531        if not ev:
532            raise Exception("AP setup timed out")
533
534        state = hapd.get_status_field("state")
535        if state != "ENABLED":
536            raise Exception("Unexpected interface state")
537
538        dev[0].connect("he", key_mgmt="NONE", scan_freq="5180")
539        dev[0].wait_regdom(country_ie=True)
540        hwsim_utils.test_connectivity(dev[0], hapd)
541        sig = dev[0].request("SIGNAL_POLL").splitlines()
542        if "FREQUENCY=5180" not in sig:
543            raise Exception("Unexpected SIGNAL_POLL value(1): " + str(sig))
544        if "WIDTH=160 MHz" not in sig:
545            raise Exception("Unexpected SIGNAL_POLL value(2): " + str(sig))
546        est = dev[0].get_bss(bssid)['est_throughput']
547        if est != "1201002":
548            raise Exception("Unexpected BSS est_throughput: " + est)
549    except Exception as e:
550        if isinstance(e, Exception) and str(e) == "AP startup failed":
551            if not he_supported():
552                raise HwsimSkip("80/160 MHz channel not supported in regulatory information")
553        raise
554    finally:
555        if hapd:
556            hapd.request("DISABLE")
557        dev[0].disconnect_and_stop_scan()
558        subprocess.call(['iw', 'reg', 'set', '00'])
559        dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=0.5)
560        dev[0].flush_scan_cache()
561
562@long_duration_test
563def test_he160b(dev, apdev):
564    """HE with 160 MHz channel width (2)"""
565    try:
566        hapd = None
567
568        params = {"ssid": "he",
569                  "country_code": "FI",
570                  "hw_mode": "a",
571                  "channel": "104",
572                  "ht_capab": "[HT40-]",
573                  "vht_capab": "[VHT160]",
574                  "ieee80211n": "1",
575                  "ieee80211ac": "1",
576                  "ieee80211ax": "1",
577                  "vht_oper_chwidth": "2",
578                  "vht_oper_centr_freq_seg0_idx": "114",
579                  "he_oper_chwidth": "2",
580                  "he_oper_centr_freq_seg0_idx": "114",
581                  'ieee80211d': '1',
582                  'ieee80211h': '1'}
583        hapd = hostapd.add_ap(apdev[1], params, wait_enabled=False)
584
585        ev = wait_dfs_event(hapd, "DFS-CAC-START", 5)
586        if "DFS-CAC-START" not in ev:
587            raise Exception("Unexpected DFS event(2)")
588
589        state = hapd.get_status_field("state")
590        if state != "DFS":
591            if state == "DISABLED" and not os.path.exists("dfs"):
592                # Not all systems have recent enough CRDA version and
593                # wireless-regdb changes to support 160 MHz and DFS. For now,
594                # do not report failures for this test case.
595                raise HwsimSkip("CRDA or wireless-regdb did not support 160 MHz")
596            raise Exception("Unexpected interface state: " + state)
597
598        logger.info("Waiting for CAC to complete")
599
600        ev = wait_dfs_event(hapd, "DFS-CAC-COMPLETED", 70)
601        if "success=1" not in ev:
602            raise Exception("CAC failed(2)")
603        if "freq=5520" not in ev:
604            raise Exception("Unexpected DFS freq result(2)")
605
606        ev = hapd.wait_event(["AP-ENABLED"], timeout=5)
607        if not ev:
608            raise Exception("AP setup timed out(2)")
609
610        state = hapd.get_status_field("state")
611        if state != "ENABLED":
612            raise Exception("Unexpected interface state(2)")
613
614        freq = hapd.get_status_field("freq")
615        if freq != "5520":
616            raise Exception("Unexpected frequency(2)")
617
618        dev[0].connect("he", key_mgmt="NONE", scan_freq="5520")
619        dev[0].wait_regdom(country_ie=True)
620        hwsim_utils.test_connectivity(dev[0], hapd)
621        sig = dev[0].request("SIGNAL_POLL").splitlines()
622        if "FREQUENCY=5520" not in sig:
623            raise Exception("Unexpected SIGNAL_POLL value(1): " + str(sig))
624        if "WIDTH=160 MHz" not in sig:
625            raise Exception("Unexpected SIGNAL_POLL value(2): " + str(sig))
626    except Exception as e:
627        if isinstance(e, Exception) and str(e) == "AP startup failed":
628            if not he_supported():
629                raise HwsimSkip("80/160 MHz channel not supported in regulatory information")
630        raise
631    finally:
632        if hapd:
633            hapd.request("DISABLE")
634        dev[0].disconnect_and_stop_scan()
635        subprocess.call(['iw', 'reg', 'set', '00'])
636        dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=0.5)
637        dev[0].flush_scan_cache()
638
639def test_he160_no_dfs_100_plus(dev, apdev):
640    """HE with 160 MHz channel width and no DFS (100 plus)"""
641    run_ap_he160_no_dfs(dev, apdev, "100", "[HT40+]")
642
643def test_he160_no_dfs(dev, apdev):
644    """HE with 160 MHz channel width and no DFS (104 minus)"""
645    run_ap_he160_no_dfs(dev, apdev, "104", "[HT40-]")
646
647def test_he160_no_dfs_108_plus(dev, apdev):
648    """HE with 160 MHz channel width and no DFS (108 plus)"""
649    run_ap_he160_no_dfs(dev, apdev, "108", "[HT40+]")
650
651def test_he160_no_dfs_112_minus(dev, apdev):
652    """HE with 160 MHz channel width and no DFS (112 minus)"""
653    run_ap_he160_no_dfs(dev, apdev, "112", "[HT40-]")
654
655def test_he160_no_dfs_116_plus(dev, apdev):
656    """HE with 160 MHz channel width and no DFS (116 plus)"""
657    run_ap_he160_no_dfs(dev, apdev, "116", "[HT40+]")
658
659def test_he160_no_dfs_120_minus(dev, apdev):
660    """HE with 160 MHz channel width and no DFS (120 minus)"""
661    run_ap_he160_no_dfs(dev, apdev, "120", "[HT40-]")
662
663def test_he160_no_dfs_124_plus(dev, apdev):
664    """HE with 160 MHz channel width and no DFS (124 plus)"""
665    run_ap_he160_no_dfs(dev, apdev, "124", "[HT40+]")
666
667def test_he160_no_dfs_128_minus(dev, apdev):
668    """HE with 160 MHz channel width and no DFS (128 minus)"""
669    run_ap_he160_no_dfs(dev, apdev, "128", "[HT40-]")
670
671def run_ap_he160_no_dfs(dev, apdev, channel, ht_capab):
672    try:
673        hapd = None
674        params = {"ssid": "he",
675                  "country_code": "ZA",
676                  "hw_mode": "a",
677                  "channel": channel,
678                  "ht_capab": ht_capab,
679                  "vht_capab": "[VHT160]",
680                  "ieee80211n": "1",
681                  "ieee80211ac": "1",
682                  "ieee80211ax": "1",
683                  "vht_oper_chwidth": "2",
684                  "vht_oper_centr_freq_seg0_idx": "114",
685                  "he_oper_chwidth": "2",
686                  "he_oper_centr_freq_seg0_idx": "114",
687                  'ieee80211d': '1',
688                  'ieee80211h': '1'}
689        hapd = hostapd.add_ap(apdev[0], params, wait_enabled=False)
690        ev = hapd.wait_event(["AP-ENABLED"], timeout=2)
691        if not ev:
692            cmd = subprocess.Popen(["iw", "reg", "get"], stdout=subprocess.PIPE)
693            reg = cmd.stdout.readlines()
694            for r in reg:
695                if b"5490" in r and b"DFS" in r:
696                    raise HwsimSkip("ZA regulatory rule did not have DFS requirement removed")
697            raise Exception("AP setup timed out")
698
699        freq = str(int(channel) * 5 + 5000)
700        dev[0].connect("he", key_mgmt="NONE", scan_freq=freq)
701        dev[0].wait_regdom(country_ie=True)
702        hwsim_utils.test_connectivity(dev[0], hapd)
703        sig = dev[0].request("SIGNAL_POLL").splitlines()
704        if "FREQUENCY=" + freq not in sig:
705            raise Exception("Unexpected SIGNAL_POLL value(1): " + str(sig))
706        if "WIDTH=160 MHz" not in sig:
707            raise Exception("Unexpected SIGNAL_POLL value(2): " + str(sig))
708    except Exception as e:
709        if isinstance(e, Exception) and str(e) == "AP startup failed":
710            if not he_supported():
711                raise HwsimSkip("80/160 MHz channel not supported in regulatory information")
712        raise
713    finally:
714        clear_regdom(hapd, dev)
715
716def test_he160_no_ht40(dev, apdev):
717    """HE with 160 MHz channel width and HT40 disabled"""
718    try:
719        hapd = None
720        params = {"ssid": "he",
721                  "country_code": "ZA",
722                  "hw_mode": "a",
723                  "channel": "108",
724                  "ht_capab": "",
725                  "ieee80211n": "1",
726                  "ieee80211ac": "1",
727                  "ieee80211ax": "1",
728                  "vht_oper_chwidth": "2",
729                  "vht_oper_centr_freq_seg0_idx": "114",
730                  "he_oper_chwidth": "2",
731                  "he_oper_centr_freq_seg0_idx": "114",
732                  'ieee80211d': '1',
733                  'ieee80211h': '1'}
734        hapd = hostapd.add_ap(apdev[0], params, wait_enabled=False)
735        ev = hapd.wait_event(["AP-ENABLED", "AP-DISABLED"], timeout=2)
736        if not ev:
737            cmd = subprocess.Popen(["iw", "reg", "get"], stdout=subprocess.PIPE)
738            reg = cmd.stdout.readlines()
739            for r in reg:
740                if "5490" in r and "DFS" in r:
741                    raise HwsimSkip("ZA regulatory rule did not have DFS requirement removed")
742            raise Exception("AP setup timed out")
743        if "AP-ENABLED" in ev:
744            # This was supposed to fail due to sec_channel_offset == 0
745            raise Exception("Unexpected AP-ENABLED")
746    except Exception as e:
747        if isinstance(e, Exception) and str(e) == "AP startup failed":
748            if not he_supported():
749                raise HwsimSkip("80/160 MHz channel not supported in regulatory information")
750        raise
751    finally:
752        clear_regdom(hapd, dev)
753
754def test_he80plus80(dev, apdev):
755    """HE with 80+80 MHz channel width"""
756    try:
757        hapd = None
758        hapd2 = None
759        params = {"ssid": "he",
760                  "country_code": "US",
761                  "hw_mode": "a",
762                  "channel": "52",
763                  "ht_capab": "[HT40+]",
764                  "vht_capab": "[VHT160-80PLUS80]",
765                  "ieee80211n": "1",
766                  "ieee80211ac": "1",
767                  "ieee80211ax": "1",
768                  "vht_oper_chwidth": "3",
769                  "vht_oper_centr_freq_seg0_idx": "58",
770                  "vht_oper_centr_freq_seg1_idx": "155",
771                  "he_oper_chwidth": "3",
772                  "he_oper_centr_freq_seg0_idx": "58",
773                  "he_oper_centr_freq_seg1_idx": "155",
774                  'ieee80211d': '1',
775                  'ieee80211h': '1'}
776        hapd = hostapd.add_ap(apdev[0], params, wait_enabled=False)
777        # This will actually fail since DFS on 80+80 is not yet supported
778        ev = hapd.wait_event(["AP-DISABLED"], timeout=5)
779        # ignore result to avoid breaking the test once 80+80 DFS gets enabled
780
781        params = {"ssid": "he2",
782                  "country_code": "US",
783                  "hw_mode": "a",
784                  "channel": "36",
785                  "ht_capab": "[HT40+]",
786                  "vht_capab": "[VHT160-80PLUS80]",
787                  "ieee80211n": "1",
788                  "ieee80211ac": "1",
789                  "ieee80211ax": "1",
790                  "vht_oper_chwidth": "3",
791                  "vht_oper_centr_freq_seg0_idx": "42",
792                  "vht_oper_centr_freq_seg1_idx": "155",
793                  "he_oper_chwidth": "3",
794                  "he_oper_centr_freq_seg0_idx": "42",
795                  "he_oper_centr_freq_seg1_idx": "155"}
796        hapd2 = hostapd.add_ap(apdev[1], params, wait_enabled=False)
797
798        ev = hapd2.wait_event(["AP-ENABLED", "AP-DISABLED"], timeout=5)
799        if not ev:
800            raise Exception("AP setup timed out(2)")
801        if "AP-DISABLED" in ev:
802            # Assume this failed due to missing regulatory update for now
803            raise HwsimSkip("80+80 MHz channel not supported in regulatory information")
804
805        state = hapd2.get_status_field("state")
806        if state != "ENABLED":
807            raise Exception("Unexpected interface state(2)")
808
809        dev[1].connect("he2", key_mgmt="NONE", scan_freq="5180")
810        hwsim_utils.test_connectivity(dev[1], hapd2)
811        sig = dev[1].request("SIGNAL_POLL").splitlines()
812        if "FREQUENCY=5180" not in sig:
813            raise Exception("Unexpected SIGNAL_POLL value(1): " + str(sig))
814        if "WIDTH=80+80 MHz" not in sig:
815            raise Exception("Unexpected SIGNAL_POLL value(2): " + str(sig))
816        if "CENTER_FRQ1=5210" not in sig:
817            raise Exception("Unexpected SIGNAL_POLL value(3): " + str(sig))
818        if "CENTER_FRQ2=5775" not in sig:
819            raise Exception("Unexpected SIGNAL_POLL value(4): " + str(sig))
820    except Exception as e:
821        if isinstance(e, Exception) and str(e) == "AP startup failed":
822            if not he_supported():
823                raise HwsimSkip("80/160 MHz channel not supported in regulatory information")
824        raise
825    finally:
826        dev[0].request("DISCONNECT")
827        dev[1].request("DISCONNECT")
828        if hapd:
829            hapd.request("DISABLE")
830        if hapd2:
831            hapd2.request("DISABLE")
832        subprocess.call(['iw', 'reg', 'set', '00'])
833        dev[0].flush_scan_cache()
834        dev[1].flush_scan_cache()
835
836def test_he80plus80_invalid(dev, apdev):
837    """HE with invalid 80+80 MHz channel"""
838    try:
839        hapd = None
840        params = {"ssid": "he",
841                  "country_code": "US",
842                  "hw_mode": "a",
843                  "channel": "36",
844                  "ht_capab": "[HT40+]",
845                  "ieee80211n": "1",
846                  "ieee80211ac": "1",
847                  "ieee80211ax": "1",
848                  "vht_oper_chwidth": "3",
849                  "vht_oper_centr_freq_seg0_idx": "42",
850                  "vht_oper_centr_freq_seg1_idx": "0",
851                  "he_oper_chwidth": "3",
852                  "he_oper_centr_freq_seg0_idx": "42",
853                  "he_oper_centr_freq_seg1_idx": "0",
854                  'ieee80211d': '1',
855                  'ieee80211h': '1'}
856        hapd = hostapd.add_ap(apdev[0], params, wait_enabled=False)
857        # This fails due to missing(invalid) seg1 configuration
858        ev = hapd.wait_event(["AP-DISABLED"], timeout=5)
859        if ev is None:
860            raise Exception("AP-DISABLED not reported")
861    except Exception as e:
862        if isinstance(e, Exception) and str(e) == "AP startup failed":
863            if not he_supported():
864                raise HwsimSkip("80/160 MHz channel not supported in regulatory information")
865        raise
866    finally:
867        clear_regdom(hapd, dev)
868
869def test_he80_csa(dev, apdev):
870    """HE with 80 MHz channel width and CSA"""
871    csa_supported(dev[0])
872    try:
873        hapd = None
874        params = {"ssid": "he",
875                  "country_code": "US",
876                  "hw_mode": "a",
877                  "channel": "149",
878                  "ht_capab": "[HT40+]",
879                  "ieee80211n": "1",
880                  "ieee80211ac": "1",
881                  "ieee80211ax": "1",
882                  "vht_oper_chwidth": "1",
883                  "vht_oper_centr_freq_seg0_idx": "155",
884                  "he_oper_chwidth": "1",
885                  "he_oper_centr_freq_seg0_idx": "155"}
886        hapd = hostapd.add_ap(apdev[0], params)
887
888        dev[0].connect("he", key_mgmt="NONE", scan_freq="5745")
889        hwsim_utils.test_connectivity(dev[0], hapd)
890
891        hapd.request("CHAN_SWITCH 5 5180 ht vht he blocktx center_freq1=5210 sec_channel_offset=1 bandwidth=80")
892        ev = hapd.wait_event(["CTRL-EVENT-STARTED-CHANNEL-SWITCH"], timeout=10)
893        if ev is None:
894            raise Exception("Channel switch start event not seen")
895        if "freq=5180" not in ev:
896            raise Exception("Unexpected channel in CS started")
897        ev = hapd.wait_event(["CTRL-EVENT-CHANNEL-SWITCH"], timeout=10)
898        if ev is None:
899            raise Exception("Channel switch completion event not seen")
900        if "freq=5180" not in ev:
901            raise Exception("Unexpected channel in CS completed")
902        ev = hapd.wait_event(["AP-CSA-FINISHED"], timeout=10)
903        if ev is None:
904            raise Exception("CSA finished event timed out")
905        if "freq=5180" not in ev:
906            raise Exception("Unexpected channel in CSA finished event")
907        time.sleep(0.5)
908        hwsim_utils.test_connectivity(dev[0], hapd)
909
910        hapd.request("CHAN_SWITCH 5 5745")
911        ev = hapd.wait_event(["AP-CSA-FINISHED"], timeout=10)
912        if ev is None:
913            raise Exception("CSA finished event timed out")
914        if "freq=5745" not in ev:
915            raise Exception("Unexpected channel in CSA finished event")
916        time.sleep(0.5)
917        hwsim_utils.test_connectivity(dev[0], hapd)
918
919        # This CSA to same channel will fail in kernel, so use this only for
920        # extra code coverage.
921        hapd.request("CHAN_SWITCH 5 5745")
922        hapd.wait_event(["AP-CSA-FINISHED"], timeout=1)
923    except Exception as e:
924        if isinstance(e, Exception) and str(e) == "AP startup failed":
925            if not he_supported():
926                raise HwsimSkip("80 MHz channel not supported in regulatory information")
927        raise
928    finally:
929        dev[0].request("DISCONNECT")
930        clear_regdom(hapd, dev)
931
932def test_he_on_24ghz(dev, apdev):
933    """Subset of HE features on 2.4 GHz"""
934    hapd = None
935    params = {"ssid": "test-he-2g",
936              "hw_mode": "g",
937              "channel": "1",
938              "ieee80211n": "1",
939              "ieee80211ax": "1",
940              "vht_oper_chwidth": "0",
941              "vht_oper_centr_freq_seg0_idx": "1",
942              "he_oper_chwidth": "0",
943              "he_oper_centr_freq_seg0_idx": "1"}
944    hapd = hostapd.add_ap(apdev[0], params)
945    try:
946        dev[0].connect("test-he-2g", scan_freq="2412", key_mgmt="NONE")
947        hwsim_utils.test_connectivity(dev[0], hapd)
948        sta = hapd.get_sta(dev[0].own_addr())
949
950        dev[1].connect("test-he-2g", scan_freq="2412", key_mgmt="NONE")
951        sta = hapd.get_sta(dev[1].own_addr())
952
953    finally:
954        dev[0].request("DISCONNECT")
955        dev[1].request("DISCONNECT")
956        if hapd:
957            hapd.request("DISABLE")
958        subprocess.call(['iw', 'reg', 'set', '00'])
959        dev[0].flush_scan_cache()
960        dev[1].flush_scan_cache()
961
962def test_he80_pwr_constraint(dev, apdev):
963    """HE with 80 MHz channel width and local power constraint"""
964    hapd = None
965    try:
966        params = {"ssid": "he",
967                  "country_code": "FI",
968                  "hw_mode": "a",
969                  "channel": "36",
970                  "ht_capab": "[HT40+]",
971                  "ieee80211d": "1",
972                  "local_pwr_constraint": "3",
973                  "ieee80211n": "1",
974                  "ieee80211ac": "1",
975                  "ieee80211ax": "1",
976                  "vht_oper_chwidth": "1",
977                  "vht_oper_centr_freq_seg0_idx": "42",
978                  "he_oper_chwidth": "1",
979                  "he_oper_centr_freq_seg0_idx": "42"}
980        hapd = hostapd.add_ap(apdev[0], params)
981
982        dev[0].connect("he", key_mgmt="NONE", scan_freq="5180")
983        dev[0].wait_regdom(country_ie=True)
984    except Exception as e:
985        if isinstance(e, Exception) and str(e) == "AP startup failed":
986            if not he_supported():
987                raise HwsimSkip("80 MHz channel not supported in regulatory information")
988        raise
989    finally:
990        if hapd:
991            hapd.request("DISABLE")
992        dev[0].disconnect_and_stop_scan()
993        subprocess.call(['iw', 'reg', 'set', '00'])
994        dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=0.5)
995        dev[0].flush_scan_cache()
996
997def test_he_use_sta_nsts(dev, apdev):
998    """HE with 80 MHz channel width and use_sta_nsts=1"""
999    try:
1000        hapd = None
1001        params = {"ssid": "he",
1002                  "country_code": "FI",
1003                  "hw_mode": "a",
1004                  "channel": "36",
1005                  "ht_capab": "[HT40+]",
1006                  "ieee80211n": "1",
1007                  "ieee80211ac": "1",
1008                  "ieee80211ax": "1",
1009                  "vht_oper_chwidth": "1",
1010                  "vht_oper_centr_freq_seg0_idx": "42",
1011                  "he_oper_chwidth": "1",
1012                  "he_oper_centr_freq_seg0_idx": "42",
1013                  "use_sta_nsts": "1"}
1014        hapd = hostapd.add_ap(apdev[0], params)
1015        bssid = apdev[0]['bssid']
1016
1017        dev[0].connect("he", key_mgmt="NONE", scan_freq="5180")
1018        hwsim_utils.test_connectivity(dev[0], hapd)
1019    except Exception as e:
1020        if isinstance(e, Exception) and str(e) == "AP startup failed":
1021            if not he_supported():
1022                raise HwsimSkip("80 MHz channel not supported in regulatory information")
1023        raise
1024    finally:
1025        clear_regdom(hapd, dev)
1026
1027def test_he_tkip(dev, apdev):
1028    """HE and TKIP"""
1029    skip_without_tkip(dev[0])
1030    try:
1031        hapd = None
1032        params = {"ssid": "he",
1033                  "wpa": "1",
1034                  "wpa_key_mgmt": "WPA-PSK",
1035                  "wpa_pairwise": "TKIP",
1036                  "wpa_passphrase": "12345678",
1037                  "country_code": "FI",
1038                  "hw_mode": "a",
1039                  "channel": "36",
1040                  "ht_capab": "[HT40+]",
1041                  "ieee80211n": "1",
1042                  "ieee80211ac": "1",
1043                  "ieee80211ax": "1",
1044                  "vht_oper_chwidth": "1",
1045                  "vht_oper_centr_freq_seg0_idx": "42",
1046                  "he_oper_chwidth": "1",
1047                  "he_oper_centr_freq_seg0_idx": "42"}
1048        hapd = hostapd.add_ap(apdev[0], params)
1049        bssid = apdev[0]['bssid']
1050
1051        dev[0].connect("he", psk="12345678", scan_freq="5180")
1052        hwsim_utils.test_connectivity(dev[0], hapd)
1053        sig = dev[0].request("SIGNAL_POLL").splitlines()
1054        if "FREQUENCY=5180" not in sig:
1055            raise Exception("Unexpected SIGNAL_POLL value(1): " + str(sig))
1056        if "WIDTH=20 MHz (no HT)" not in sig:
1057            raise Exception("Unexpected SIGNAL_POLL value(2): " + str(sig))
1058        status = hapd.get_status()
1059        logger.info("hostapd STATUS: " + str(status))
1060        if status["ieee80211n"] != "0":
1061            raise Exception("Unexpected STATUS ieee80211n value")
1062        if status["ieee80211ac"] != "0":
1063            raise Exception("Unexpected STATUS ieee80211ac value")
1064        if status["ieee80211ax"] != "0":
1065            raise Exception("Unexpected STATUS ieee80211ax value")
1066        if status["secondary_channel"] != "0":
1067            raise Exception("Unexpected STATUS secondary_channel value")
1068    except Exception as e:
1069        if isinstance(e, Exception) and str(e) == "AP startup failed":
1070            if not he_supported():
1071                raise HwsimSkip("80 MHz channel not supported in regulatory information")
1072        raise
1073    finally:
1074        dev[0].request("DISCONNECT")
1075        clear_regdom(hapd, dev)
1076
1077def test_he_40_fallback_to_20(devs, apdevs):
1078    """HE and 40 MHz channel configuration falling back to 20 MHz"""
1079    dev = devs[0]
1080    ap = apdevs[0]
1081    try:
1082        hapd = None
1083        params = {"ssid": "test-he40",
1084                  "country_code": "US",
1085                  "hw_mode": "a",
1086                  "basic_rates": "60 120 240",
1087                  "channel": "161",
1088                  "ieee80211d": "1",
1089                  "ieee80211h": "1",
1090                  "ieee80211n": "1",
1091                  "ieee80211ac": "1",
1092                  "ieee80211ax": "1",
1093                  "ht_capab": "[HT40+][SHORT-GI-20][SHORT-GI-40][DSSS_CCK-40]",
1094                  "vht_capab": "[RXLDPC][SHORT-GI-80][TX-STBC-2BY1][RX-STBC1][MAX-MPDU-11454][MAX-A-MPDU-LEN-EXP7]",
1095                  "vht_oper_chwidth": "0",
1096                  "vht_oper_centr_freq_seg0_idx": "155",
1097                  "he_oper_chwidth": "0",
1098                  "he_oper_centr_freq_seg0_idx": "155"}
1099        hapd = hostapd.add_ap(ap, params)
1100        dev.connect("test-he40", scan_freq="5805", key_mgmt="NONE")
1101        dev.wait_regdom(country_ie=True)
1102        hwsim_utils.test_connectivity(dev, hapd)
1103    finally:
1104        clear_regdom(hapd, devs)
1105
1106def test_he80_to_24g_he(dev, apdev):
1107    """HE with 80 MHz channel width reconfigured to 2.4 GHz HE"""
1108    try:
1109        hapd = None
1110        params = {"ssid": "he",
1111                  "country_code": "FI",
1112                  "hw_mode": "a",
1113                  "channel": "36",
1114                  "ht_capab": "[HT40+]",
1115                  "ieee80211n": "1",
1116                  "ieee80211ac": "1",
1117                  "ieee80211ax": "1",
1118                  "vht_oper_chwidth": "1",
1119                  "vht_capab": "[MAX-MPDU-11454]",
1120                  "vht_oper_centr_freq_seg0_idx": "42",
1121                  "he_oper_chwidth": "1",
1122                  "he_oper_centr_freq_seg0_idx": "42"}
1123        hapd = hostapd.add_ap(apdev[0], params)
1124        bssid = apdev[0]['bssid']
1125
1126        hapd.disable()
1127        hapd.set("ieee80211ac", "0")
1128        hapd.set("hw_mode", "g")
1129        hapd.set("channel", "1")
1130        hapd.set("ht_capab", "")
1131        hapd.set("vht_capab", "")
1132        hapd.set("he_oper_chwidth", "")
1133        hapd.set("he_oper_centr_freq_seg0_idx", "")
1134        hapd.set("vht_oper_chwidth", "")
1135        hapd.set("vht_oper_centr_freq_seg0_idx", "")
1136        hapd.enable()
1137
1138        dev[0].connect("he", key_mgmt="NONE", scan_freq="2412")
1139    except Exception as e:
1140        if isinstance(e, Exception) and str(e) == "AP startup failed":
1141            if not he_supported():
1142                raise HwsimSkip("80 MHz channel not supported in regulatory information")
1143        raise
1144    finally:
1145        dev[0].request("DISCONNECT")
1146        clear_regdom(hapd, dev)
1147
1148def test_he_twt(dev, apdev):
1149    """HE and TWT"""
1150    params = {"ssid": "he",
1151              "ieee80211ax": "1",
1152              "he_bss_color": "42",
1153              "he_twt_required":"1"}
1154    hapd = hostapd.add_ap(apdev[0], params)
1155
1156    dev[0].connect("he", key_mgmt="NONE", scan_freq="2412")
1157    if "OK" not in dev[0].request("TWT_SETUP"):
1158        raise Exception("TWT_SETUP failed")
1159    if "OK" not in dev[0].request("TWT_TEARDOWN"):
1160        raise Exception("TWT_SETUP failed")
1161    if "OK" not in dev[0].request("TWT_SETUP dialog=123 exponent=9 mantissa=10 min_twt=254 setup_cmd=1 twt=1234567890 requestor=1 trigger=0 implicit=0 flow_type=0 flow_id=2 protection=1 twt_channel=3 control=16"):
1162        raise Exception("TWT_SETUP failed")
1163    if "OK" not in dev[0].request("TWT_TEARDOWN flags=255"):
1164        raise Exception("TWT_SETUP failed")
1165
1166def test_he_6ghz(dev, apdev):
1167    """HE with 20 MHz channel width on 6 GHz"""
1168    check_sae_capab(dev[0])
1169
1170    try:
1171        dev[0].set("sae_pwe", "1")
1172        hapd = None
1173        params = {"ssid": "he",
1174                  "country_code": "DE",
1175                  "op_class": "131",
1176                  "channel": "5",
1177                  "ieee80211ax": "1",
1178                  "wpa": "2",
1179                  "rsn_pairwise": "CCMP",
1180                  "wpa_key_mgmt": "SAE",
1181                  "sae_pwe": "1",
1182                  "sae_password": "password",
1183                  "ieee80211w": "2"}
1184        hapd = hostapd.add_ap(apdev[0], params, set_channel=False)
1185        bssid = apdev[0]['bssid']
1186
1187        dev[0].set("sae_groups", "")
1188        dev[0].connect("he", sae_password="password", key_mgmt="SAE",
1189                       ieee80211w="2", scan_freq="5975")
1190        hwsim_utils.test_connectivity(dev[0], hapd)
1191        sig = dev[0].request("SIGNAL_POLL").splitlines()
1192        if "FREQUENCY=5975" not in sig:
1193            raise Exception("Unexpected SIGNAL_POLL value(1): " + str(sig))
1194        if "WIDTH=20 MHz" not in sig:
1195            raise Exception("Unexpected SIGNAL_POLL value(2): " + str(sig))
1196        status = dev[0].get_status()
1197        if 'wifi_generation' not in status:
1198            # For now, assume this is because of missing kernel support
1199            raise HwsimSkip("Association Request IE reporting not supported")
1200            #raise Exception("Missing wifi_generation information")
1201        if status['wifi_generation'] != "6":
1202            raise Exception("Unexpected wifi_generation value: " + status['wifi_generation'])
1203        status = hapd.get_status()
1204        logger.info("hostapd STATUS: " + str(status))
1205        if status["ieee80211ax"] != "1":
1206            raise Exception("Unexpected STATUS ieee80211ax value")
1207        if status["he_oper_chwidth"] != "0":
1208            raise Exception("Unexpected STATUS he_oper_chwidth value")
1209
1210        sta = hapd.get_sta(dev[0].own_addr())
1211        logger.info("hostapd STA: " + str(sta))
1212        if "[HE]" not in sta['flags']:
1213            raise Exception("Missing STA flag: HE")
1214
1215    except Exception as e:
1216        if isinstance(e, Exception) and str(e) == "AP startup failed":
1217            if not he_supported():
1218                raise HwsimSkip("HE 6 GHz channel not supported in regulatory information")
1219        raise
1220    finally:
1221        dev[0].request("DISCONNECT")
1222        dev[0].set("sae_pwe", "0")
1223        clear_regdom(hapd, dev)
1224
1225def test_he_6ghz_auto_security(dev, apdev):
1226    """HE on 6 GHz and automatic security settings on STA"""
1227    check_sae_capab(dev[0])
1228    try:
1229        hapd = None
1230        params = {"ssid": "he",
1231                  "country_code": "DE",
1232                  "op_class": "131",
1233                  "channel": "5",
1234                  "ieee80211ax": "1",
1235                  "wpa": "2",
1236                  "ieee80211w": "2",
1237                  "rsn_pairwise": "CCMP",
1238                  "wpa_key_mgmt": "SAE",
1239                  "sae_password": "password"}
1240        hapd = hostapd.add_ap(apdev[0], params, set_channel=False)
1241        bssid = apdev[0]['bssid']
1242
1243        dev[0].set("sae_groups", "")
1244        dev[0].connect("he", psk="password", key_mgmt="SAE WPA-PSK",
1245                       ieee80211w="1", scan_freq="5975")
1246        status = dev[0].get_status()
1247        if "pmf" not in status:
1248            raise Exception("pmf missing from status")
1249        if status["pmf"] != "2":
1250            raise Exception("Unexpected pmf status value: " + status["pmf"])
1251
1252        hapd.wait_sta()
1253        sta = hapd.get_sta(dev[0].own_addr())
1254        if sta["hostapdMFPR"] != "1":
1255            raise Exception("STA did not indicate MFPR=1")
1256    except Exception as e:
1257        if isinstance(e, Exception) and str(e) == "AP startup failed":
1258            if not he_supported():
1259                raise HwsimSkip("HE 6 GHz channel not supported in regulatory information")
1260        raise
1261    finally:
1262        dev[0].request("DISCONNECT")
1263        clear_regdom(hapd, dev)
1264
1265    params = hostapd.wpa2_params(ssid="he", passphrase="password")
1266    hapd = hostapd.add_ap(apdev[1], params)
1267    bssid = apdev[1]['bssid']
1268    dev[0].scan_for_bss(bssid, freq=2412)
1269    dev[0].request("RECONNECT")
1270    dev[0].wait_connected()
1271    status = dev[0].get_status()
1272    if "pmf" in status:
1273        raise Exception("Unexpected pmf status value(2): " + status["pmf"])
1274    hapd.wait_sta()
1275    sta = hapd.get_sta(dev[0].own_addr())
1276    if "[MFP]" in sta["flags"]:
1277        raise Exception("MFP reported unexpectedly(2)")
1278
1279def he_6ghz_acs(dev, apdev, op_class, bw):
1280    check_sae_capab(dev[0])
1281
1282    try:
1283        dev[0].set("sae_pwe", "1")
1284        hapd = None
1285        params = {"ssid": "he",
1286                  "country_code": "DE",
1287                  "op_class": str(op_class),
1288                  "hw_mode": "a",
1289                  "channel": "0",
1290                  "ieee80211ax": "1",
1291                  "wpa": "2",
1292                  "rsn_pairwise": "CCMP",
1293                  "wpa_key_mgmt": "SAE",
1294                  "sae_pwe": "1",
1295                  "sae_password": "password",
1296                  "ieee80211w": "2"}
1297        hapd = hostapd.add_ap(apdev[0], params, wait_enabled=False)
1298        wait_acs(hapd)
1299        bssid = apdev[0]['bssid']
1300
1301        freq = hapd.get_status_field("freq")
1302        if int(freq) < 5955:
1303            raise Exception("Unexpected frequency: " + freq)
1304
1305        sec = hapd.get_status_field("secondary_channel")
1306        if bw > 20 and int(sec) == 0:
1307            raise Exception("Secondary channel not set")
1308        if bw == 20 and int(sec) != 0:
1309            raise Exception("Secondary channel set")
1310
1311        dev[0].set("sae_groups", "")
1312        dev[0].connect("he", sae_password="password", key_mgmt="SAE",
1313                       ieee80211w="2", scan_freq=freq)
1314        hwsim_utils.test_connectivity(dev[0], hapd)
1315        sig = dev[0].request("SIGNAL_POLL").splitlines()
1316        if "FREQUENCY=" + freq not in sig:
1317            raise Exception("Unexpected SIGNAL_POLL value(1): " + str(sig))
1318        if "WIDTH=" + str(bw) + " MHz" not in sig:
1319            raise Exception("Unexpected SIGNAL_POLL value(2): " + str(sig))
1320    except Exception as e:
1321        if isinstance(e, Exception) and str(e) == "AP startup failed":
1322            if not he_supported():
1323                raise HwsimSkip("HE 6 GHz channel not supported in regulatory information")
1324        raise
1325    finally:
1326        dev[0].request("DISCONNECT")
1327        dev[0].set("sae_pwe", "0")
1328        clear_regdom(hapd, dev)
1329
1330def test_he_6ghz_acs_20mhz(dev, apdev):
1331    """HE with ACS on 6 GHz using a 20 MHz channel"""
1332    he_6ghz_acs(dev, apdev, 131, 20)
1333
1334def test_he_6ghz_acs_40mhz(dev, apdev):
1335    """HE with ACS on 6 GHz using a 40 MHz channel"""
1336    he_6ghz_acs(dev, apdev, 132, 40)
1337
1338def test_he_6ghz_acs_80mhz(dev, apdev):
1339    """HE with ACS on 6 GHz using an 80 MHz channel"""
1340    he_6ghz_acs(dev, apdev, 133, 80)
1341
1342def test_he_6ghz_acs_160mhz(dev, apdev):
1343    """HE with ACS on 6 GHz using a 160 MHz channel"""
1344    he_6ghz_acs(dev, apdev, 134, 160)
1345
1346def test_he_6ghz_security(dev, apdev):
1347    """HE AP and 6 GHz security parameter validation"""
1348    params = {"ssid": "he",
1349              "ieee80211ax": "1",
1350              "op_class": "131",
1351              "channel": "1"}
1352    hapd = hostapd.add_ap(apdev[0], params, no_enable=True)
1353
1354    # Pre-RSNA security methods are not allowed in 6 GHz
1355    if "FAIL" not in hapd.request("ENABLE"):
1356        raise Exception("Invalid configuration accepted(1)")
1357
1358    # Management frame protection is required in 6 GHz"
1359    hapd.set("wpa", "2")
1360    hapd.set("wpa_passphrase", "12345678")
1361    hapd.set("wpa_key_mgmt", "SAE")
1362    hapd.set("rsn_pairwise", "CCMP")
1363    hapd.set("ieee80211w", "1")
1364    if "FAIL" not in hapd.request("ENABLE"):
1365        raise Exception("Invalid configuration accepted(2)")
1366
1367    # Invalid AKM suite for 6 GHz
1368    hapd.set("ieee80211w", "2")
1369    hapd.set("wpa_key_mgmt", "SAE WPA-PSK")
1370    if "FAIL" not in hapd.request("ENABLE"):
1371        raise Exception("Invalid configuration accepted(3)")
1372
1373    # Invalid pairwise cipher suite for 6 GHz
1374    hapd.set("wpa_key_mgmt", "SAE")
1375    hapd.set("rsn_pairwise", "CCMP TKIP")
1376    if "FAIL" not in hapd.request("ENABLE"):
1377        raise Exception("Invalid configuration accepted(4)")
1378
1379    # Invalid group cipher suite for 6 GHz
1380    hapd.set("wpa_key_mgmt", "SAE")
1381    hapd.set("rsn_pairwise", "CCMP")
1382    hapd.set("group_cipher", "TKIP")
1383    if "FAIL" not in hapd.request("ENABLE"):
1384        raise Exception("Invalid configuration accepted(5)")
1385
1386def test_he_prefer_he20(dev, apdev):
1387    """Preference on HE20 over HT20"""
1388    params = {"ssid": "he",
1389              "channel": "1",
1390              "ieee80211ax": "0",
1391              "ieee80211n": "1"}
1392    hapd = hostapd.add_ap(apdev[0], params)
1393    bssid = apdev[0]['bssid']
1394    params = {"ssid": "test",
1395              "channel": "1",
1396              "ieee80211ax": "1",
1397              "ieee80211n": "1"}
1398    hapd2 = hostapd.add_ap(apdev[1], params)
1399    bssid2 = apdev[1]['bssid']
1400
1401    dev[0].scan_for_bss(bssid, freq=2412)
1402    dev[0].scan_for_bss(bssid2, freq=2412)
1403    dev[0].connect("test", key_mgmt="NONE", scan_freq="2412")
1404    if dev[0].get_status_field('bssid') != bssid2:
1405        raise Exception("Unexpected BSS selected")
1406
1407    est = dev[0].get_bss(bssid)['est_throughput']
1408    if est != "65000":
1409        raise Exception("Unexpected BSS0 est_throughput: " + est)
1410
1411    est = dev[0].get_bss(bssid2)['est_throughput']
1412    if est != "143402":
1413        raise Exception("Unexpected BSS1 est_throughput: " + est)
1414
1415def test_he_capab_parsing(dev, apdev):
1416    """HE AP and capability parsing"""
1417    params = {"ssid": "he",
1418              "ieee80211ax": "1",
1419              "he_bss_color": "42",
1420              "he_mu_edca_ac_be_ecwmin": "7",
1421              "he_mu_edca_ac_be_ecwmax": "15"}
1422    hapd = hostapd.add_ap(apdev[0], params)
1423
1424    hapd.set("ext_mgmt_frame_handling", "1")
1425    bssid = hapd.own_addr().replace(':', '')
1426    addr = "020304050607"
1427    addr_ = "02:03:04:05:06:07"
1428
1429    tests = []
1430    mac_capa = binascii.unhexlify("0178c81a4000")
1431    phy_capa = binascii.unhexlify("00bfce0000000000000000")
1432    mcs_nss = binascii.unhexlify("faff")
1433    payload = mac_capa + phy_capa + 2*mcs_nss
1434    hdr = struct.pack('BBB', 255, 1 + len(payload), 35)
1435    tests += [ (hdr + payload, True) ]
1436
1437    phy_capa = binascii.unhexlify("08bfce0000000000000000")
1438    payload = mac_capa + phy_capa + 4*mcs_nss
1439    hdr = struct.pack('BBB', 255, 1 + len(payload), 35)
1440    tests += [ (hdr + payload, True) ]
1441
1442    phy_capa = binascii.unhexlify("10bfce0000000000000000")
1443    payload = mac_capa + phy_capa + 4*mcs_nss
1444    hdr = struct.pack('BBB', 255, 1 + len(payload), 35)
1445    tests += [ (hdr + payload, True) ]
1446
1447    phy_capa = binascii.unhexlify("18bfce0000000000000000")
1448    payload = mac_capa + phy_capa + 6*mcs_nss
1449    hdr = struct.pack('BBB', 255, 1 + len(payload), 35)
1450    tests += [ (hdr + payload, True) ]
1451
1452    # Missing PPE Threshold field
1453    phy_capa = binascii.unhexlify("00bfce0000008000000000")
1454    payload = mac_capa + phy_capa + 2*mcs_nss
1455    hdr = struct.pack('BBB', 255, 1 + len(payload), 35)
1456    tests += [ (hdr + payload, False) ]
1457
1458    # Truncated PPE Threshold field
1459    phy_capa = binascii.unhexlify("00bfce0000008000000000")
1460    payload = mac_capa + phy_capa + 2*mcs_nss + struct.pack('B', 0x79)
1461    hdr = struct.pack('BBB', 255, 1 + len(payload), 35)
1462    tests += [ (hdr + payload, False) ]
1463
1464    # Extra field at the end (without PPE Threshold field)
1465    phy_capa = binascii.unhexlify("00bfce0000000000000000")
1466    payload = mac_capa + phy_capa + 2*mcs_nss
1467    payload += struct.pack('B', 0)
1468    hdr = struct.pack('BBB', 255, 1 + len(payload), 35)
1469    tests += [ (hdr + payload, True) ]
1470
1471    # Extra field at the end (with PPE Threshold field)
1472    phy_capa = binascii.unhexlify("00bfce0000008000000000")
1473    payload = mac_capa + phy_capa + 2*mcs_nss
1474    payload += binascii.unhexlify("79000000000000")
1475    payload += struct.pack('B', 0)
1476    hdr = struct.pack('BBB', 255, 1 + len(payload), 35)
1477    tests += [ (hdr + payload, True) ]
1478
1479    ppet = []
1480    # NSTS=1 (i.e., NSTS field value 0), RU Index Bitmask=0x0
1481    # --> 3 + 4 + 0 * 6 * 1 = 7 bits --> 1 octet
1482    ppet += ["00"]
1483    # NSTS=1 (i.e., NSTS field value 0), RU Index Bitmask=0x1
1484    # --> 3 + 4 + 1 * 6 * 1 = 13 bits --> 2 octets
1485    ppet += ["08" + "00"]
1486    # NSTS=1 (i.e., NSTS field value 0), RU Index Bitmask=0x8
1487    # --> 3 + 4 + 1 * 6 * 1 = 13 bits --> 2 octets
1488    ppet += ["40" + "00"]
1489    # NSTS=2 (i.e., NSTS field value 1), RU Index Bitmask=0xf
1490    # --> 3 + 4 + 4 * 6 * 2 = 55 bits --> 7 octets
1491    ppet += ["79" + 6*"00"]
1492    # NSTS=3 (i.e., NSTS field value 2), RU Index Bitmask=0xf
1493    # --> 3 + 4 + 4 * 6 * 3 = 79 bits --> 10 octets
1494    ppet += ["7a" + 9*"00"]
1495    # NSTS=4 (i.e., NSTS field value 3), RU Index Bitmask=0x5
1496    # --> 3 + 4 + 2 * 6 * 4 = 55 bits --> 7 octets
1497    ppet += ["2b" + 6*"00"]
1498    # NSTS=8 (i.e., NSTS field value 7), RU Index Bitmask=0xf
1499    # --> 3 + 4 + 4 * 6 * 8 = 199 bits --> 25 octets
1500    ppet += ["ff" + 24*"00"]
1501    for p in ppet:
1502        phy_capa = binascii.unhexlify("00bfce0000008000000000")
1503        payload = mac_capa + phy_capa + 2*mcs_nss + binascii.unhexlify(p)
1504        hdr = struct.pack('BBB', 255, 1 + len(payload), 35)
1505        tests += [ (hdr + payload, True) ]
1506
1507    for capab, result in tests:
1508        auth = "b0003a01" + bssid + addr + bssid + '1000000001000000'
1509        if "OK" not in hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % auth):
1510            raise Exception("MGMT_RX_PROCESS failed")
1511
1512        he_capab = binascii.hexlify(capab).decode()
1513
1514        ies = "00026865" # SSID
1515        ies += "010802040b160c121824" # Supp Rates
1516        ies += "32043048606c" # Ext Supp Rates
1517        ies += "2d1afe131bffff000000000000000000000100000000000000000000" # HT Capab
1518        ies += "7f0b04004a0201404040000120" # Ext Capab
1519        ies += he_capab
1520        ies += "3b155151525354737475767778797a7b7c7d7e7f808182" # Supp Op Classes
1521        ies += "dd070050f202000100" # WMM
1522
1523        assoc_req = "00003a01" + bssid + addr + bssid + "2000" + "21040500" + ies
1524        if "OK" not in hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % assoc_req):
1525            raise Exception("MGMT_RX_PROCESS failed")
1526
1527        sta = hapd.get_sta(addr_)
1528        if result:
1529            if "[HE]" not in sta['flags']:
1530                raise Exception("Missing STA flag: HE (HE Capab: %s)" % he_capab)
1531        else:
1532            if "[HE]" in sta['flags']:
1533                raise Exception("Unexpected STA flag: HE (HE Capab: %s)" % he_capab)
1534
1535        deauth = "c0003a01" + bssid + addr + bssid + "3000" + "0300"
1536        if "OK" not in hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % deauth):
1537            raise Exception("MGMT_RX_PROCESS failed")
1538
1539        hapd.dump_monitor()
1540
1541def test_he_cw_change_notification(dev, apdev):
1542    """HE AP on 80 MHz channel and CW change notification"""
1543    try:
1544        hapd = None
1545        params = {"ssid": "he",
1546                  "country_code": "FI",
1547                  "hw_mode": "a",
1548                  "channel": "36",
1549                  "ht_capab": "[HT40+]",
1550                  "ieee80211n": "1",
1551                  "ieee80211ac": "1",
1552                  "ieee80211ax": "1",
1553                  "vht_oper_chwidth": "1",
1554                  "vht_oper_centr_freq_seg0_idx": "42",
1555                  "he_oper_chwidth": "1",
1556                  "he_oper_centr_freq_seg0_idx": "42"}
1557        hapd = hostapd.add_ap(apdev[0], params)
1558        bssid = apdev[0]['bssid']
1559
1560        dev[0].connect("he", key_mgmt="NONE", scan_freq="5180")
1561        dev[1].connect("he", key_mgmt="NONE", scan_freq="5180",
1562                       disable_he="1")
1563        dev[2].connect("he", key_mgmt="NONE", scan_freq="5180",
1564                       disable_he="1", disable_vht="1")
1565
1566        sta = hapd.get_sta(dev[0].own_addr())
1567        logger.info("hostapd STA0: " + str(sta))
1568        if "[HT]" not in sta['flags']:
1569            raise Exception("Missing STA0 flag: HT")
1570        if "[VHT]" not in sta['flags']:
1571            raise Exception("Missing STA0 flag: VHT")
1572        if "[HE]" not in sta['flags']:
1573            raise Exception("Missing STA0 flag: HE")
1574
1575        sta = hapd.get_sta(dev[1].own_addr())
1576        logger.info("hostapd STA1: " + str(sta))
1577        if "[HT]" not in sta['flags']:
1578            raise Exception("Missing STA1 flag: HT")
1579        if "[VHT]" not in sta['flags']:
1580            raise Exception("Missing STA1 flag: VHT")
1581        if "[HE]" in sta['flags']:
1582            raise Exception("Unexpected STA1 flag: HE")
1583
1584        sta = hapd.get_sta(dev[2].own_addr())
1585        logger.info("hostapd STA1: " + str(sta))
1586        if "[HT]" not in sta['flags']:
1587            raise Exception("Missing STA2 flag: HT")
1588        if "[VHT]" in sta['flags']:
1589            raise Exception("Unxpected STA2 flag: VHT")
1590        if "[HE]" in sta['flags']:
1591            raise Exception("Unexpected STA2 flag: HE")
1592
1593        for i in [2, 1, 0]:
1594            if "OK" not in hapd.request("NOTIFY_CW_CHANGE %d" % i):
1595                raise Exception("NOTIFY_CW_CHANGE %d failed" % i)
1596
1597            time.sleep(1)
1598            hwsim_utils.test_connectivity(dev[0], hapd)
1599            hwsim_utils.test_connectivity(dev[1], hapd)
1600            hwsim_utils.test_connectivity(dev[2], hapd)
1601    except Exception as e:
1602        if isinstance(e, Exception) and str(e) == "AP startup failed":
1603            if not he_supported():
1604                raise HwsimSkip("80 MHz channel not supported in regulatory information")
1605        raise
1606    finally:
1607        dev[0].request("DISCONNECT")
1608        dev[1].request("DISCONNECT")
1609        dev[2].request("DISCONNECT")
1610        clear_regdom(hapd, dev)
1611
1612def he_verify_status(wpas, hapd, freq, bw, is_6ghz=True):
1613    status = hapd.get_status()
1614    logger.info("hostapd STATUS: " + str(status))
1615
1616    if status["ieee80211n"] != "1":
1617        raise Exception("Unexpected STATUS ieee80211n value")
1618    if status["ieee80211ac"] != "1":
1619        raise Exception("Unexpected STATUS ieee80211ac value")
1620    if status["ieee80211ax"] != "1":
1621        raise Exception("Unexpected STATUS ieee80211ax value")
1622
1623    sta = hapd.get_sta(wpas.own_addr())
1624    if "[HE]" not in sta['flags']:
1625        raise Exception("Missing STA flag: HE")
1626    if is_6ghz and "[6GHZ]" not in sta['flags']:
1627        raise Exception("Missing STA flag: 6GHZ")
1628
1629    sig = wpas.request("SIGNAL_POLL").splitlines()
1630    if "FREQUENCY=%s" % freq not in sig:
1631        raise Exception("Unexpected SIGNAL_POLL value(1): " + str(sig))
1632    if "WIDTH=%s MHz" % bw not in sig:
1633        raise Exception("Unexpected SIGNAL_POLL value(2): " + str(sig))
1634
1635def he_verify_wifi_version(dev):
1636    status = dev.get_status()
1637    logger.info("station status: " + str(status))
1638
1639    # For now, assume this is because of missing kernel support
1640    if 'wifi_generation' not in status:
1641        raise HwsimSkip("Association Request IE reporting not supported")
1642        #raise Exception("Missing wifi_generation information")
1643
1644    if status['wifi_generation'] != "6":
1645        raise Exception("Unexpected wifi_generation value: " + status['wifi_generation'])
1646
1647def test_he_6ghz_reg(dev, apdev):
1648    """TX power control on 6 GHz"""
1649    try:
1650        ssid = "HE_6GHz_regulatory"
1651        freq = 5975
1652        bw = "20"
1653        hapd = None
1654        params = {"ssid": ssid,
1655                  "country_code": "DE",
1656                  "hw_mode": "a",
1657                  "ieee80211ax": "1",
1658                  "wpa": "2",
1659                  "rsn_pairwise": "CCMP",
1660                  "wpa_key_mgmt": "SAE",
1661                  "sae_pwe": "1",
1662                  "sae_password": "password",
1663                  "ieee80211w": "2",
1664                  "channel": "5",
1665                  "op_class": "131",
1666                  "he_oper_centr_freq_seg0_idx": "5",
1667                  "ieee80211d": "1",
1668                  "ieee80211h": "1",
1669                  "ieee80211n": "1",
1670                  "ieee80211ac": "1",
1671                  "local_pwr_constraint": "4",
1672                  # Set the 6 GHz regulatory power configuration
1673                  "he_6ghz_reg_pwr_type": "0",
1674                  # Note: hostapd uses "Maximum Transmit Power Interpretation"
1675                  # set to "Regulatory client EIRP PSD", so the values should
1676                  # be set accordingly.
1677                  "reg_def_cli_eirp_psd": "3",
1678                  "reg_sub_cli_eirp_psd": "2"}
1679
1680        hapd = hostapd.add_ap(apdev[0], params, set_channel=False)
1681
1682        dev[0].set("sae_pwe", "1")
1683        dev[0].set("sae_groups", "")
1684        dev[0].connect(ssid, sae_password="password", key_mgmt="SAE",
1685                       ieee80211w="2", scan_freq=str(freq))
1686        hapd.wait_sta()
1687
1688        he_verify_status(dev[0], hapd, freq, bw)
1689        he_verify_wifi_version(dev[0])
1690        hwsim_utils.test_connectivity(dev[0], hapd)
1691
1692        # Configure different values related to power constraints and update
1693        # the Beacon frame contents.
1694        hapd.set("local_pwr_constraint", "2")
1695        hapd.set("he_6ghz_reg_pwr_type", "2")
1696        hapd.set("reg_def_cli_eirp_psd", "2")
1697        hapd.set("reg_sub_cli_eirp_psd", "1")
1698
1699        # In addition, inject a Transmit Power Envelope as an vendor element
1700        hapd.set("vendor_elements", "c303190202")
1701
1702        if "OK" not in hapd.request("UPDATE_BEACON"):
1703            raise Exception("UPDATE_BEACON failed")
1704
1705        # Allow few more Beacon frames
1706        time.sleep(0.5)
1707
1708        # Modify the regulatory power type to SP and provide the client EIRP
1709        # limit
1710        # EIRP = PSD + 10 * log(channel width)
1711        # 16 = 3 + 10 * log(20)
1712        hapd.set("vendor_elements", "")
1713        hapd.set("he_6ghz_reg_pwr_type", "1")
1714        hapd.set("reg_def_cli_eirp", "14")
1715
1716        if "OK" not in hapd.request("UPDATE_BEACON"):
1717            raise Exception("UPDATE_BEACON failed")
1718
1719        # Allow few more Beacon frames
1720        time.sleep(0.5)
1721    except Exception as e:
1722        if isinstance(e, Exception) and str(e) == "AP startup failed":
1723            if not he_supported():
1724                raise HwsimSkip("HE 6 GHz channel not supported in regulatory information")
1725        raise
1726    finally:
1727        dev[0].request("DISCONNECT")
1728        dev[0].set("sae_pwe", "0")
1729        dev[0].wait_disconnected()
1730        clear_regdom(hapd, dev)
1731
1732def test_he_downgrade_40mhz_to_20mhz(dev, apdev):
1733    """HE AP and downgrade from 40 MHz to 20 MHz due to regulatory constraints"""
1734    # Try to configure 40 MHz channel when the regdb limits this frequency to
1735    # 20 MHz.
1736    params = {"ssid": "he",
1737              "country_code": "AM",
1738              "channel": "36",
1739              "op_class": "116",
1740              "ieee80211n": "1",
1741              "ieee80211ac": "1",
1742              "ieee80211ax": "1",
1743              "hw_mode": "a",
1744              "ht_capab": "[HT40+]",
1745              "vht_oper_chwidth": "0",
1746              "he_oper_chwidth": "0" }
1747    run_he_downgrade_to_20_mhz(dev, apdev, params)
1748
1749def test_he_downgrade_40mhz_plus_minus_to_20mhz(dev, apdev):
1750    """HE AP and downgrade from 40 MHz (+/-) to 20 MHz due to regulatory constraints"""
1751    # Try to configure 40 MHz channel when the regdb limits this frequency to
1752    # 20 MHz.
1753    params = {"ssid": "he",
1754              "country_code": "AM",
1755              "channel": "36",
1756              "op_class": "116",
1757              "ieee80211n": "1",
1758              "ieee80211ac": "1",
1759              "ieee80211ax": "1",
1760              "hw_mode": "a",
1761              "ht_capab": "[HT40+][HT40-]",
1762              "vht_oper_chwidth": "0",
1763              "he_oper_chwidth": "0" }
1764    run_he_downgrade_to_20_mhz(dev, apdev, params)
1765
1766def test_he_downgrade_80mhz_to_20mhz(dev, apdev):
1767    """HE AP and downgrade from 80 MHz to 20 MHz due to regulatory constraints"""
1768    # Try to configure 80 MHz channel when the regdb limits this frequency to
1769    # 20 MHz.
1770    params = {"ssid": "he",
1771              "country_code": "AM",
1772              "channel": "36",
1773              "op_class": "128",
1774              "ieee80211n": "1",
1775              "ieee80211ac": "1",
1776              "ieee80211ax": "1",
1777              "hw_mode": "a",
1778              "ht_capab": "[HT40+]",
1779              "vht_oper_centr_freq_seg0_idx": "42",
1780              "he_oper_centr_freq_seg0_idx": "42",
1781              "vht_oper_chwidth": "1",
1782              "he_oper_chwidth": "1" }
1783    run_he_downgrade_to_20_mhz(dev, apdev, params)
1784
1785def run_he_downgrade_to_20_mhz(dev, apdev, params):
1786    try:
1787        hapd = None
1788        hapd = hostapd.add_ap(apdev[0], params)
1789        dev[0].connect("he", key_mgmt="NONE", scan_freq="5180")
1790        sig = dev[0].request("SIGNAL_POLL").splitlines()
1791        logger.info("SIGNAL_POLL: " + str(sig))
1792        if "WIDTH=20 MHz" not in sig:
1793            raise Exception("20 MHz channel width not reported")
1794        dev[0].request("DISCONNECT")
1795        dev[0].wait_disconnected()
1796        hapd.wait_sta_disconnect()
1797    finally:
1798        dev[0].request("DISCONNECT")
1799        clear_regdom(hapd, dev)
1800
1801def test_he_bss_color_change(dev, apdev):
1802    """HE AP with color change"""
1803    params = {"ssid": "test_he",
1804              "ieee80211ax": "1",
1805              "he_bss_color": "42",
1806              "he_mu_edca_ac_be_ecwmin": "7",
1807              "he_mu_edca_ac_be_ecwmax": "15"}
1808    hapd = hostapd.add_ap(apdev[0], params)
1809    if hapd.get_status_field("ieee80211ax") != "1":
1810        raise Exception("STATUS did not indicate ieee80211ax=1")
1811
1812    color = hapd.get_status_field("he_bss_color")
1813    if color != "42":
1814        raise Exception("Expected current he_bss_color to be 42; was " + color)
1815
1816    # Small sleep to capture Beacon frames before the change
1817    time.sleep(0.5)
1818
1819    # Change color by doing CCA
1820    if "OK" not in hapd.request("COLOR_CHANGE 20"):
1821        raise Exception("COLOR_CHANGE failed")
1822    time.sleep(1.5)
1823
1824    color = hapd.get_status_field("he_bss_color")
1825    if color != "20":
1826        raise Exception("Expected current he_bss_color to be 20")
1827
1828    # Disable color by setting value to 0
1829    if "OK" not in hapd.request("COLOR_CHANGE 0"):
1830        raise Exception("COLOR_CHANGE failed")
1831    time.sleep(1.5)
1832
1833    color = hapd.get_status_field("he_bss_color")
1834    if color is not None:
1835        raise Exception("Expected he_bss_color to get disabled but found " + color)
1836
1837    # Enable color back by setting same previous color value
1838    if "OK" not in hapd.request("COLOR_CHANGE 20"):
1839        raise Exception("COLOR_CHANGE failed")
1840    time.sleep(1.5)
1841
1842    color = hapd.get_status_field("he_bss_color")
1843    if color != "20":
1844        raise Exception("Expected current he_bss_color to be 20")
1845
1846    hapd.dump_monitor()
1847