1# EHT tests
2# Copyright (c) 2022-2024, Qualcomm Innovation Center, Inc.
3#
4# This software may be distributed under the terms of the BSD license.
5# See README for more details.
6
7import binascii
8import tempfile
9
10import hostapd
11from utils import *
12from hwsim import HWSimRadio
13import hwsim_utils
14from wpasupplicant import WpaSupplicant
15import re
16from tshark import run_tshark
17from test_gas import hs20_ap_params
18from test_dpp import check_dpp_capab, wait_auth_success
19from test_rrm import build_beacon_request, run_req_beacon, BeaconReport
20
21def eht_verify_wifi_version(dev):
22    status = dev.get_status()
23    logger.info("station status: " + str(status))
24
25    if 'wifi_generation' not in status:
26        raise Exception("Missing wifi_generation information")
27    if status['wifi_generation'] != "7":
28        raise Exception("Unexpected wifi_generation value: " + status['wifi_generation'])
29
30def _eht_get_links_bitmap(wpas, name):
31    vfile = "/sys/kernel/debug/ieee80211/%s/netdev:%s/%s" % \
32        (wpas.get_driver_status_field("phyname"), wpas.ifname, name)
33
34    if wpas.cmd_execute(["ls", vfile])[0] != 0:
35        logger_info("%s not supported in mac80211: %s" % (name, vfile))
36        return 0
37
38    res, out = wpas.cmd_execute(["cat", vfile], shell=True)
39    if res != 0:
40        raise Exception("Failed to read %s" % fname)
41
42    logger.info("%s=%s" % (name, out))
43    return int(out, 16)
44
45def _eht_valid_links(wpas):
46    return _eht_get_links_bitmap(wpas, "valid_links")
47
48def _eht_active_links(wpas):
49    return _eht_get_links_bitmap(wpas, "active_links")
50
51def _eht_dormant_links(wpas):
52    return _eht_get_links_bitmap(wpas, "dormant_links")
53
54def _eht_verify_links(wpas, valid_links=0, active_links=0):
55    vlinks = _eht_valid_links(wpas)
56    if vlinks != valid_links:
57        raise Exception("Unexpected valid links (0x%04x != 0x%04x)" % (vlinks, valid_links))
58
59    alinks = _eht_active_links(wpas)
60    if alinks != active_links:
61        raise Exception("Unexpected active links (0x%04x != 0x%04x)" % (alinks, active_links))
62
63def eht_verify_status(wpas, hapd, freq, bw, is_ht=False, is_vht=False,
64                      mld=False, valid_links=0, active_links=0):
65    status = hapd.get_status()
66
67    logger.info("hostapd STATUS: " + str(status))
68    if is_ht and status["ieee80211n"] != "1":
69        raise Exception("Unexpected STATUS ieee80211n value")
70    if is_vht and status["ieee80211ac"] != "1":
71        raise Exception("Unexpected STATUS ieee80211ac value")
72    if status["ieee80211ax"] != "1":
73        raise Exception("Unexpected STATUS ieee80211ax value")
74    if status["ieee80211be"] != "1":
75        raise Exception("Unexpected STATUS ieee80211be value")
76
77    sta = hapd.get_sta(wpas.own_addr())
78    logger.info("hostapd STA: " + str(sta))
79    if sta['addr'] == 'FAIL':
80        raise Exception("hostapd " + hapd.ifname + " did not have a STA entry for the STA " + wpas.own_addr())
81    if is_ht and "[HT]" not in sta['flags']:
82        raise Exception("Missing STA flag: HT")
83    if is_vht and "[VHT]" not in sta['flags']:
84        raise Exception("Missing STA flag: VHT")
85    if "[HE]" not in sta['flags']:
86        raise Exception("Missing STA flag: HE")
87    if "[EHT]" not in sta['flags']:
88        raise Exception("Missing STA flag: EHT")
89
90    sig = wpas.request("SIGNAL_POLL").splitlines()
91
92    # TODO: With MLD connection, signal poll logic is still not implemented.
93    # While mac80211 maintains the station using the MLD address, the
94    # information is maintained in the link stations, but it is not sent to
95    # user space yet.
96    if not mld:
97        if "FREQUENCY=%s" % freq not in sig:
98            raise Exception("Unexpected SIGNAL_POLL value(1): " + str(sig))
99        if "WIDTH=%s MHz" % bw not in sig:
100            raise Exception("Unexpected SIGNAL_POLL value(2): " + str(sig))
101
102    # Active links are updated in async work after the connection.
103    # Sleep a bit to allow it to run.
104    time.sleep(0.1)
105    _eht_verify_links(wpas, valid_links, active_links)
106
107def traffic_test(wpas, hapd, success=True):
108    hwsim_utils.test_connectivity(wpas, hapd, success_expected=success)
109
110def test_eht_open(dev, apdev):
111    """EHT AP with open mode configuration"""
112    params = {"ssid": "eht",
113              "ieee80211ax": "1",
114              "ieee80211be": "1"}
115    try:
116        hapd = hostapd.add_ap(apdev[0], params)
117    except Exception as e:
118        if isinstance(e, Exception) and \
119           str(e) == "Failed to set hostapd parameter ieee80211be":
120            raise HwsimSkip("EHT not supported")
121        raise
122    if hapd.get_status_field("ieee80211be") != "1":
123        raise Exception("AP STATUS did not indicate ieee80211be=1")
124    dev[0].connect("eht", key_mgmt="NONE", scan_freq="2412")
125    sta = hapd.get_sta(dev[0].own_addr())
126    if "[EHT]" not in sta['flags']:
127        raise Exception("Missing STA flag: EHT")
128    status = dev[0].request("STATUS")
129    if "wifi_generation=7" not in status:
130        raise Exception("STA STATUS did not indicate wifi_generation=7")
131
132def test_prefer_eht_20(dev, apdev):
133    """EHT AP on a 20 MHz channel"""
134    params = {"ssid": "eht",
135              "channel": "1",
136              "ieee80211ax": "1",
137              "ieee80211be" : "1",
138              "ieee80211n": "1"}
139    try:
140        hapd0 = hostapd.add_ap(apdev[0], params)
141
142        params["ieee80211be"] = "0"
143        hapd1 = hostapd.add_ap(apdev[1], params)
144    except Exception as e:
145        if isinstance(e, Exception) and \
146           str(e) == "Failed to set hostapd parameter ieee80211be":
147            raise HwsimSkip("EHT not supported")
148        raise
149
150    dev[0].connect("eht", key_mgmt="NONE")
151    if dev[0].get_status_field('bssid') != apdev[0]['bssid']:
152        raise Exception("dev[0] connected to unexpected AP")
153
154    est = dev[0].get_bss(apdev[0]['bssid'])['est_throughput']
155    if est != "172103":
156      raise Exception("Unexpected BSS1 est_throughput: " + est)
157
158def start_eht_sae_ap(apdev, ml=False, transition_mode=False):
159    params = hostapd.wpa2_params(ssid="eht", passphrase="12345678")
160    params["ieee80211ax"] = "1"
161    params["ieee80211be"] = "1"
162    params['ieee80211w'] = '1' if transition_mode else '2'
163    params['rsn_pairwise'] = "CCMP GCMP-256" if transition_mode else "GCMP-256"
164    params['group_cipher'] = "CCMP" if transition_mode else "GCMP-256"
165    params["group_mgmt_cipher"] = "AES-128-CMAC" if transition_mode else "BIP-GMAC-256"
166    params['beacon_prot'] = '1'
167    params['wpa_key_mgmt'] = "SAE SAE-EXT-KEY WPA-PSK WPA-PSK-SHA256" if transition_mode else 'SAE-EXT-KEY'
168    params['sae_groups'] = "19 20" if transition_mode else "20"
169    params['sae_pwe'] = "2" if transition_mode else "1"
170    if ml:
171        ml_elem = "ff0d6b" + "3001" + "0a" + "021122334455" + "01" + "00" + "00"
172        params['vendor_elements'] = ml_elem
173    try:
174        hapd = hostapd.add_ap(apdev, params)
175    except Exception as e:
176        if isinstance(e, Exception) and \
177           str(e) == "Failed to set hostapd parameter ieee80211be":
178            raise HwsimSkip("EHT not supported")
179        raise
180
181def test_eht_sae(dev, apdev):
182    """EHT AP with SAE"""
183    check_sae_capab(dev[0])
184
185    hapd = start_eht_sae_ap(apdev[0])
186    try:
187        dev[0].set("sae_groups", "20")
188        dev[0].set("sae_pwe", "2")
189        dev[0].connect("eht", key_mgmt="SAE-EXT-KEY", psk="12345678",
190                       ieee80211w="2", beacon_prot="1",
191                       pairwise="GCMP-256", group="GCMP-256",
192                       group_mgmt="BIP-GMAC-256", scan_freq="2412")
193    finally:
194        dev[0].set("sae_groups", "")
195        dev[0].set("sae_pwe", "0")
196
197def test_eht_sae_mlo(dev, apdev):
198    """EHT+MLO AP with SAE"""
199    check_sae_capab(dev[0])
200
201    hapd = start_eht_sae_ap(apdev[0], ml=True)
202    try:
203        dev[0].set("sae_groups", "20")
204        dev[0].set("sae_pwe", "2")
205        dev[0].connect("eht", key_mgmt="SAE-EXT-KEY", psk="12345678",
206                       ieee80211w="2", beacon_prot="1",
207                       pairwise="GCMP-256", group="GCMP-256",
208                       group_mgmt="BIP-GMAC-256", scan_freq="2412")
209    finally:
210        dev[0].set("sae_groups", "")
211        dev[0].set("sae_pwe", "0")
212
213def test_eht_sae_mlo_tm(dev, apdev):
214    """EHT+MLO AP with SAE and transition mode"""
215    check_sae_capab(dev[0])
216    check_sae_capab(dev[1])
217
218    hapd = start_eht_sae_ap(apdev[0], ml=True, transition_mode=True)
219    try:
220        dev[0].set("sae_groups", "20")
221        dev[0].set("sae_pwe", "2")
222        dev[0].connect("eht", key_mgmt="SAE-EXT-KEY", psk="12345678",
223                       ieee80211w="2", beacon_prot="1",
224                       pairwise="GCMP-256", group="CCMP",
225                       group_mgmt="AES-128-CMAC", scan_freq="2412")
226        dev[1].set("sae_groups", "19")
227        dev[1].connect("eht", key_mgmt="SAE-EXT-KEY", psk="12345678",
228                       ieee80211w="2", beacon_prot="1",
229                       pairwise="CCMP", group="CCMP",
230                       group_mgmt="AES-128-CMAC", scan_freq="2412",
231                       disable_eht="1")
232        dev[2].connect("eht", key_mgmt="WPA-PSK", psk="12345678",
233                       pairwise="CCMP", group="CCMP",
234                       group_mgmt="AES-128-CMAC", scan_freq="2412",
235                       disable_eht="1")
236    finally:
237        dev[0].set("sae_groups", "")
238        dev[0].set("sae_pwe", "0")
239        dev[1].set("sae_groups", "")
240
241def eht_mld_enable_ap(iface, params):
242    hapd = hostapd.add_mld_link(iface, params)
243    hapd.enable()
244
245    ev = hapd.wait_event(["AP-ENABLED", "AP-DISABLED"], timeout=1)
246    if ev is None:
247        raise Exception("AP startup timed out")
248    if "AP-ENABLED" not in ev:
249        raise Exception("AP startup failed")
250
251    return hapd
252
253def eht_mld_ap_wpa2_params(ssid, passphrase=None, key_mgmt="WPA-PSK-SHA256",
254                           mfp="2", pwe=None, beacon_prot="1"):
255    params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase,
256                                 wpa_key_mgmt=key_mgmt, ieee80211w=mfp)
257    params['ieee80211n'] = '1'
258    params['ieee80211ax'] = '1'
259    params['ieee80211be'] = '1'
260    params['channel'] = '1'
261    params['hw_mode'] = 'g'
262    params['group_mgmt_cipher'] = "AES-128-CMAC"
263    params['beacon_prot'] = beacon_prot
264
265    if pwe is not None:
266        params['sae_pwe'] = pwe
267
268    return params
269
270def _eht_mld_probe_req(wpas, hapd, tsf0, link_id=-1):
271    if "OK" not in wpas.request("ML_PROBE_REQ bssid=%s mld_id=0 link_id=%d" % (hapd.own_addr(), link_id)):
272        raise Exception("Failed to request ML probe request")
273
274    ev = wpas.wait_event(["CTRL-EVENT-SCAN-STARTED"])
275    if ev is None:
276        raise Exception("Scan did not start")
277
278    ev = wpas.wait_event(["CTRL-EVENT-SCAN-RESULTS"])
279    if ev is None:
280        raise Exception("Scan did not complete")
281
282    logger.info("ML Probe request scan done")
283
284    bss = wpas.get_bss(hapd.own_addr())
285    if not bss:
286        raise Exception("AP did not reply to ML probe request")
287
288    tsf1 = int(bss['tsf'])
289    logger.info("tsf0=%s, tsf1=%s" % (tsf0, tsf1))
290
291    if tsf0 >= tsf1:
292        raise Exception("AP was not found in ML probe request scan")
293
294def test_eht_mld_discovery(dev, apdev):
295    """EHT MLD AP discovery"""
296    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
297        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
298
299        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
300        wpas.interface_add(wpas_iface)
301
302        ssid = "mld_ap"
303        link0_params = {"ssid": ssid,
304                        "hw_mode": "g",
305                        "channel": "1"}
306        link1_params = {"ssid": ssid,
307                        "hw_mode": "g",
308                        "channel": "2"}
309
310        hapd0 = eht_mld_enable_ap(hapd_iface, link0_params)
311        hapd1 = eht_mld_enable_ap(hapd_iface, link1_params)
312
313        # Only scan link 0
314        res = wpas.request("SCAN freq=2412")
315        if "FAIL" in res:
316            raise Exception("Failed to start scan")
317
318        ev = wpas.wait_event(["CTRL-EVENT-SCAN-STARTED"])
319        if ev is None:
320            raise Exception("Scan did not start")
321
322        ev = wpas.wait_event(["CTRL-EVENT-SCAN-RESULTS"])
323        if ev is None:
324            raise Exception("Scan did not complete")
325
326        logger.info("Scan done")
327
328        rnr_pattern = re.compile(".*ap_info.*, mld ID=0, link ID=",
329                                 re.MULTILINE)
330        ml_pattern = re.compile(".*multi-link:.*, MLD addr=.*", re.MULTILINE)
331
332        bss = wpas.request("BSS " + hapd0.own_addr())
333        logger.info("BSS 0: " + str(bss))
334
335        if rnr_pattern.search(bss) is None:
336            raise Exception("RNR element not found for first link")
337
338        if ml_pattern.search(bss) is None:
339            raise Exception("ML element not found for first link")
340
341        # Save the tsf0 for checking ML Probe request scan later
342        tsf0 = int(wpas.get_bss(hapd0.own_addr())['tsf'])
343
344        if wpas.get_bss(hapd1.own_addr()) is not None:
345            raise Exception("BSS for link 1 found without ML probe request")
346
347        # Now send an ML probe request (for all links)
348        _eht_mld_probe_req(wpas, hapd0, tsf0)
349        tsf0 = int(wpas.get_bss(hapd0.own_addr())['tsf'])
350
351        # NOTE: hostapd incorrectly reports a TSF offset of zero
352        # This only works because the source is always the ML probe response
353        tsf1 = int(wpas.get_bss(hapd1.own_addr())['tsf'])
354
355        bss = wpas.request("BSS " + hapd1.own_addr())
356        logger.info("BSS 1: " + str(bss))
357
358        if rnr_pattern.search(bss) is None:
359            raise Exception("RNR element not found for second link")
360
361        if ml_pattern.search(bss) is None:
362            raise Exception("ML element not found for second link")
363
364        _eht_mld_probe_req(wpas, hapd0, tsf0, link_id=1)
365        if int(wpas.get_bss(hapd1.own_addr())['tsf']) <= tsf1:
366            raise Exception("Probe for link ID did not update BSS")
367        tsf0 = int(wpas.get_bss(hapd0.own_addr())['tsf'])
368        tsf1 = int(wpas.get_bss(hapd1.own_addr())['tsf'])
369
370        # Probing the wrong link ID should not update second link
371        _eht_mld_probe_req(wpas, hapd0, tsf0, link_id=4)
372        if int(wpas.get_bss(hapd1.own_addr())['tsf']) != tsf1:
373            raise Exception("Probe for other link ID not updated BSS")
374
375def test_eht_mld_owe_two_links(dev, apdev):
376    """EHT MLD AP with MLD client OWE connection using two links"""
377    _eht_mld_owe_two_links(dev, apdev)
378
379def _eht_mld_owe_two_links(dev, apdev, second_link_disabled=False,
380                           only_one_link=False):
381    with HWSimRadio(use_mlo=True) as (hapd0_radio, hapd0_iface), \
382        HWSimRadio(use_mlo=True) as (hapd1_radio, hapd1_iface), \
383        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
384
385        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
386        wpas.interface_add(wpas_iface)
387
388        ssid = "mld_ap_owe_two_link"
389        params = eht_mld_ap_wpa2_params(ssid, key_mgmt="OWE", mfp="2")
390
391        hapd0 = eht_mld_enable_ap(hapd0_iface, params)
392
393        params['channel'] = '6'
394        if second_link_disabled:
395            params['mld_indicate_disabled'] = '1'
396
397        hapd1 = eht_mld_enable_ap(hapd0_iface, params)
398        # Check legacy client connection
399        dev[0].connect(ssid, scan_freq="2437", key_mgmt="OWE", ieee80211w="2")
400
401        if only_one_link:
402            link0 = hapd0.get_status_field("link_addr")
403            wpas.set("bssid_filter", link0)
404        wpas.connect(ssid, scan_freq="2412 2437", key_mgmt="OWE",
405                     ieee80211w="2")
406
407        active_links = 3
408        valid_links = 3
409        if second_link_disabled:
410            dlinks = _eht_dormant_links(wpas)
411            if dlinks != 2:
412                raise Exception("Unexpected dormant links")
413            active_links = 1
414        if only_one_link:
415            active_links = 1
416            valid_links = 1
417
418        eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True,
419                          valid_links=valid_links, active_links=active_links)
420        eht_verify_wifi_version(wpas)
421        traffic_test(wpas, hapd0)
422
423        if not second_link_disabled:
424            traffic_test(wpas, hapd1)
425
426        if only_one_link:
427            wpas.set("bssid_filter", "")
428
429def test_eht_mld_owe_two_links_one_disabled(dev, apdev):
430    """AP MLD with MLD client OWE connection when one of the AP MLD links is disabled"""
431    _eht_mld_owe_two_links(dev, apdev, second_link_disabled=True)
432
433def test_eht_mld_owe_two_links_only_one_negotiated(dev, apdev):
434    """AP MLD with MLD client OWE connection when only one of the links is negotiated"""
435    _eht_mld_owe_two_links(dev, apdev, only_one_link=True)
436
437def test_eht_mld_sae_single_link(dev, apdev):
438    """EHT MLD AP with MLD client SAE H2E connection using single link"""
439    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
440            HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
441        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
442        wpas.interface_add(wpas_iface)
443
444        passphrase = 'qwertyuiop'
445        ssid = "mld_ap_sae_single_link"
446        params = eht_mld_ap_wpa2_params(ssid, passphrase, key_mgmt="SAE",
447                                        mfp="2", pwe='2')
448
449        hapd0 = eht_mld_enable_ap(hapd_iface, params)
450
451        wpas.set("sae_pwe", "1")
452        wpas.connect(ssid, sae_password=passphrase, scan_freq="2412",
453                     key_mgmt="SAE", ieee80211w="2")
454
455        eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True,
456                          valid_links=1, active_links=1)
457        eht_verify_wifi_version(wpas)
458        traffic_test(wpas, hapd0)
459
460def run_eht_mld_sae_two_links(dev, apdev, beacon_prot="1",
461                              disable_enable=False):
462    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
463        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
464
465        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
466        wpas.interface_add(wpas_iface)
467
468        passphrase = 'qwertyuiop'
469        ssid = "mld_ap_sae_two_link"
470        params = eht_mld_ap_wpa2_params(ssid, passphrase,
471                                        key_mgmt="SAE", mfp="2", pwe='1',
472                                        beacon_prot=beacon_prot)
473
474        hapd0 = eht_mld_enable_ap(hapd_iface, params)
475
476        params['channel'] = '6'
477
478        hapd1 = eht_mld_enable_ap(hapd_iface, params)
479
480        wpas.set("sae_pwe", "1")
481
482        # The first authentication attempt tries to use group 20 and the
483        # authentication is expected to fail. The next authentication should
484        # use group 19 and succeed.
485        wpas.set("sae_groups", "20 19")
486
487        wpas.connect(ssid, sae_password=passphrase, scan_freq="2412 2437",
488                     key_mgmt="SAE", ieee80211w="2", beacon_prot="1")
489
490        eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True,
491                          valid_links=3, active_links=3)
492        eht_verify_wifi_version(wpas)
493
494        if wpas.get_status_field('sae_group') != '19':
495            raise Exception("Expected SAE group not used")
496
497        traffic_test(wpas, hapd0)
498        traffic_test(wpas, hapd1)
499
500        if disable_enable:
501            if "OK" not in hapd0.request("DISABLE_MLD"):
502                raise Exception("DISABLE_MLD failed")
503            ev = hapd0.wait_event(["AP-DISABLED"], timeout=1)
504            if ev is None:
505                raise Exception("AP-DISABLED not received (0)")
506            ev = hapd1.wait_event(["AP-DISABLED"], timeout=1)
507            if ev is None:
508                raise Exception("AP-DISABLED not received (1)")
509
510            # mac80211 does not seem to detect beacon loss or deauthentication
511            # in non-AP MLD case?! For now, ignore that and just force
512            # disconnection locally on the STA.
513            wpas.request("DISCONNECT")
514            wpas.wait_disconnected()
515
516            if "OK" not in hapd0.request("ENABLE_MLD"):
517                raise Exception("ENABLE_MLD failed")
518            ev = hapd0.wait_event(["AP-ENABLED"], timeout=1)
519            if ev is None:
520                raise Exception("AP-ENABLED not received (0)")
521            ev = hapd1.wait_event(["AP-ENABLED"], timeout=1)
522            if ev is None:
523                raise Exception("AP-ENABLED not received (1)")
524
525            # TODO: Figure out why this fails without PMKSA_FLUSH. Things should
526            # fall back to full SAE from failed PMKSA caching attempt
527            # automatically.
528            wpas.request("PMKSA_FLUSH")
529
530            # flush the BSS table before reconnect as otherwise the old
531            # AP MLD BSSs would be in the BSS list
532            wpas.request("BSS_FLUSH 0")
533            wpas.request("RECONNECT")
534            wpas.wait_connected()
535            hapd0.wait_sta()
536            hapd1.wait_sta()
537            traffic_test(wpas, hapd0)
538            traffic_test(wpas, hapd1)
539
540def test_eht_mld_sae_two_links(dev, apdev):
541    """EHT MLD AP with MLD client SAE H2E connection using two links"""
542    run_eht_mld_sae_two_links(dev, apdev)
543
544def test_eht_mld_sae_two_links_no_beacon_prot(dev, apdev):
545    """EHT MLD AP with MLD client SAE H2E connection using two links and no beacon protection"""
546    run_eht_mld_sae_two_links(dev, apdev, beacon_prot="0")
547
548def test_eht_mld_sae_two_links_disable_enable(dev, apdev):
549    """AP MLD with two links and disabling/enabling full AP MLD"""
550    run_eht_mld_sae_two_links(dev, apdev, disable_enable=True)
551
552def test_eht_mld_sae_ext_one_link(dev, apdev):
553    """EHT MLD AP with MLD client SAE-EXT H2E connection using single link"""
554    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
555        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
556
557        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
558        wpas.interface_add(wpas_iface)
559
560        passphrase = 'qwertyuiop'
561        ssid = "mld_ap_sae_ext_single_link"
562        params = eht_mld_ap_wpa2_params(ssid, passphrase, key_mgmt="SAE-EXT-KEY")
563
564        hapd0 = eht_mld_enable_ap(hapd_iface, params)
565
566        wpas.connect(ssid, sae_password=passphrase, scan_freq="2412",
567                     key_mgmt="SAE-EXT-KEY", ieee80211w="2")
568
569        eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True,
570                          valid_links=1, active_links=1)
571        eht_verify_wifi_version(wpas)
572        traffic_test(wpas, hapd0)
573
574def test_eht_mld_sae_ext_two_links(dev, apdev):
575    """EHT MLD AP with MLD client SAE-EXT H2E connection using two links"""
576    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
577        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
578
579        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
580        wpas.interface_add(wpas_iface)
581
582        passphrase = 'qwertyuiop'
583        ssid = "mld_ap_sae_two_link"
584        params = eht_mld_ap_wpa2_params(ssid, passphrase,
585                                        key_mgmt="SAE-EXT-KEY")
586
587        hapd0 = eht_mld_enable_ap(hapd_iface, params)
588
589        params['channel'] = '6'
590
591        hapd1 = eht_mld_enable_ap(hapd_iface, params)
592
593        wpas.connect(ssid, sae_password=passphrase, scan_freq="2412 2437",
594                     key_mgmt="SAE-EXT-KEY", ieee80211w="2")
595
596        eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True,
597                          valid_links=3, active_links=3)
598        eht_verify_wifi_version(wpas)
599        traffic_test(wpas, hapd0)
600        traffic_test(wpas, hapd1)
601
602def test_eht_mld_sae_legacy_client(dev, apdev):
603    """EHT MLD AP with legacy client SAE H2E connection"""
604    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface):
605        passphrase = 'qwertyuiop'
606        ssid = "mld_ap_sae_two_link"
607        params = eht_mld_ap_wpa2_params(ssid, passphrase,
608                                        key_mgmt="SAE", mfp="2", pwe='1')
609
610        hapd0 = eht_mld_enable_ap(hapd_iface, params)
611
612        params['channel'] = '6'
613
614        hapd1 = eht_mld_enable_ap(hapd_iface, params)
615
616        try:
617            dev[0].set("sae_groups", "")
618            dev[0].set("sae_pwe", "1")
619            dev[0].connect(ssid, sae_password=passphrase, scan_freq="2412",
620                           key_mgmt="SAE", ieee80211w="2", beacon_prot="1")
621            logger.info("wpa_supplicant STATUS:\n" + dev[0].request("STATUS"))
622            bssid = dev[0].get_status_field("bssid")
623            if hapd0.own_addr() == bssid:
624                hapd0.wait_sta();
625            elif hapd1.own_addr() == bssid:
626                hapd1.wait_sta();
627            else:
628                raise Exception("Unknown BSSID: " + bssid)
629
630            eht_verify_status(dev[0], hapd0, 2412, 20, is_ht=True)
631            traffic_test(dev[0], hapd0)
632        finally:
633            dev[0].set("sae_groups", "")
634            dev[0].set("sae_pwe", "0")
635
636def test_eht_mld_sae_transition(dev, apdev):
637    """EHT MLD AP in SAE/PSK transition mode with MLD client connection using two links"""
638    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
639        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
640
641        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
642        wpas.interface_add(wpas_iface)
643
644        passphrase = 'qwertyuiop'
645        ssid = "mld_ap_sae_two_link"
646        params = eht_mld_ap_wpa2_params(ssid, passphrase,
647                                        key_mgmt="SAE-EXT-KEY SAE WPA-PSK WPA-PSK-SHA256",
648                                        mfp="1")
649
650        hapd0 = eht_mld_enable_ap(hapd_iface, params)
651
652        params['channel'] = '6'
653
654        hapd1 = eht_mld_enable_ap(hapd_iface, params)
655
656        wpas.connect(ssid, sae_password=passphrase, scan_freq="2412 2437",
657                     key_mgmt="SAE-EXT-KEY", ieee80211w="2")
658
659        eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True,
660                          valid_links=3, active_links=3)
661        eht_verify_wifi_version(wpas)
662        traffic_test(wpas, hapd0)
663        traffic_test(wpas, hapd1)
664
665        dev[0].set("sae_groups", "")
666        dev[0].connect(ssid, sae_password=passphrase, scan_freq="2412",
667                       key_mgmt="SAE", ieee80211w="2", beacon_prot="1")
668        dev[1].connect(ssid, psk=passphrase, scan_freq="2412",
669                       key_mgmt="WPA-PSK", ieee80211w="0")
670
671def test_eht_mld_ptk_rekey(dev, apdev):
672    """EHT MLD AP and PTK rekeying with MLD client connection using two links"""
673    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
674        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
675
676        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
677        wpas.interface_add(wpas_iface)
678
679        passphrase = 'qwertyuiop'
680        ssid = "mld_ap_sae_two_link"
681        params = eht_mld_ap_wpa2_params(ssid, passphrase,
682                                        key_mgmt="SAE-EXT-KEY SAE WPA-PSK WPA-PSK-SHA256",
683                                        mfp="1")
684        params['wpa_ptk_rekey'] = '5'
685
686        hapd0 = eht_mld_enable_ap(hapd_iface, params)
687
688        params['channel'] = '6'
689
690        hapd1 = eht_mld_enable_ap(hapd_iface, params)
691
692        wpas.connect(ssid, sae_password=passphrase, scan_freq="2412 2437",
693                     key_mgmt="SAE-EXT-KEY", ieee80211w="2")
694        ev0 = hapd0.wait_event(["AP-STA-CONNECT"], timeout=1)
695        if ev0 is None:
696            ev1 = hapd1.wait_event(["AP-STA-CONNECT"], timeout=1)
697        traffic_test(wpas, hapd0)
698        traffic_test(wpas, hapd1)
699
700        ev = wpas.wait_event(["WPA: Key negotiation completed",
701                              "CTRL-EVENT-DISCONNECTED"], timeout=10)
702        if ev is None:
703            raise Exception("PTK rekey timed out")
704        if "CTRL-EVENT-DISCONNECTED" in ev:
705            raise Exception("Disconnect instead of rekey")
706
707        time.sleep(0.1)
708        traffic_test(wpas, hapd0)
709        traffic_test(wpas, hapd1)
710
711def test_eht_mld_gtk_rekey(dev, apdev):
712    """AP MLD and GTK rekeying with MLD client connection using two links"""
713    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
714        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
715
716        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
717        wpas.interface_add(wpas_iface)
718
719        passphrase = 'qwertyuiop'
720        ssid = "mld_ap_sae_two_link"
721        params = eht_mld_ap_wpa2_params(ssid, passphrase,
722                                        key_mgmt="SAE-EXT-KEY SAE WPA-PSK WPA-PSK-SHA256",
723                                        mfp="1")
724        params['wpa_group_rekey'] = '5'
725
726        hapd0 = eht_mld_enable_ap(hapd_iface, params)
727
728        params['channel'] = '6'
729
730        hapd1 = eht_mld_enable_ap(hapd_iface, params)
731
732        wpas.connect(ssid, sae_password=passphrase, scan_freq="2412 2437",
733                     key_mgmt="SAE-EXT-KEY", ieee80211w="2")
734        ev0 = hapd0.wait_event(["AP-STA-CONNECT"], timeout=1)
735        if ev0 is None:
736            ev1 = hapd1.wait_event(["AP-STA-CONNECT"], timeout=1)
737        traffic_test(wpas, hapd0)
738        traffic_test(wpas, hapd1)
739
740        for i in range(2):
741            ev = wpas.wait_event(["MLO RSN: Group rekeying completed",
742                                  "CTRL-EVENT-DISCONNECTED"], timeout=10)
743            if ev is None:
744                raise Exception("GTK rekey timed out")
745            if "CTRL-EVENT-DISCONNECTED" in ev:
746                raise Exception("Disconnect instead of rekey")
747
748            #TODO: Uncomment these ones GTK rekeying works for MLO
749            #time.sleep(0.1)
750            #traffic_test(wpas, hapd0)
751            #traffic_test(wpas, hapd1)
752
753def test_eht_ml_probe_req(dev, apdev):
754    """AP MLD with two links and non-AP MLD sending ML Probe Request"""
755    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
756        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
757
758        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
759        wpas.interface_add(wpas_iface)
760
761        passphrase = 'qwertyuiop'
762        ssid = "mld_ap_sae_two_link"
763        params = eht_mld_ap_wpa2_params(ssid, passphrase,
764                                        key_mgmt="SAE-EXT-KEY")
765
766        hapd0 = eht_mld_enable_ap(hapd_iface, params)
767
768        params['channel'] = '6'
769
770        hapd1 = eht_mld_enable_ap(hapd_iface, params)
771
772        bssid = hapd0.own_addr()
773        wpas.scan_for_bss(bssid, freq=2412)
774
775        time.sleep(1)
776        cmd = "ML_PROBE_REQ bssid=" + bssid + " mld_id=0"
777        if "OK" not in wpas.request(cmd):
778            raise Exception("Failed to run: " + cmd)
779        ev = wpas.wait_event(["CTRL-EVENT-SCAN-RESULTS",
780                              "CTRL-EVENT-SCAN-FAILED"], timeout=10)
781        if ev is None:
782            raise Exception("ML_PROBE_REQ did not result in scan results")
783
784        time.sleep(1)
785        cmd = "ML_PROBE_REQ bssid=" + bssid + " mld_id=0 link_id=2"
786        if "OK" not in wpas.request(cmd):
787            raise Exception("Failed to run: " + cmd)
788        ev = wpas.wait_event(["CTRL-EVENT-SCAN-RESULTS",
789                              "CTRL-EVENT-SCAN-FAILED"], timeout=10)
790        if ev is None:
791            raise Exception("ML_PROBE_REQ did not result in scan results")
792
793def test_eht_mld_connect_probes(dev, apdev, params):
794    """MLD client sends ML probe to connect to not discovered links"""
795    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
796        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
797
798        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
799        wpas.interface_add(wpas_iface)
800
801        ssid = "mld_ap"
802        passphrase = 'qwertyuiop'
803        link_params = eht_mld_ap_wpa2_params(ssid, passphrase, mfp="2",
804                                             key_mgmt="SAE", pwe='2')
805        link_params['channel'] = '1'
806        link_params['bssid'] = '00:11:22:33:44:01'
807        hapd0 = eht_mld_enable_ap(hapd_iface, link_params)
808
809        link_params['channel'] = '6'
810        link_params['bssid'] = '00:11:22:33:44:02'
811        hapd1 = eht_mld_enable_ap(hapd_iface, link_params)
812
813        wpas.set("sae_pwe", "1")
814        wpas.connect(ssid, sae_password= passphrase, ieee80211w="2",
815                     key_mgmt="SAE", scan_freq="2412")
816
817        out = run_tshark(os.path.join(params['logdir'], 'hwsim0.pcapng'),
818                         'wlan.fc.type_subtype == 0x0004 && wlan.ext_tag.number == 107 && wlan.ext_tag.data == 11:00:02:00:00:02:11:00',
819                         display=['frame.number'])
820        if not out.splitlines():
821            raise Exception('ML probe request not found')
822
823        # Probe Response frame has the ML element, which will be fragmented
824        out = run_tshark(os.path.join(params['logdir'], "hwsim0.pcapng"),
825                         "wlan.fc.type_subtype == 0x0005 && wlan.ext_tag.number == 107 && wlan.ext_tag.length == 254",
826                         display=['frame.number'])
827        if not out.splitlines():
828            # This requires new tshark (e.g., 4.0.6); for now, ignore the issue
829            # to avoid forcing such upgrade.
830            logger.info('ML probe response not found')
831            #raise Exception('ML probe response not found')
832
833        eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True,
834                          valid_links=3, active_links=3)
835        traffic_test(wpas, hapd0)
836        traffic_test(wpas, hapd1)
837
838def test_eht_tx_link_rejected_connect_other(dev, apdev, params):
839    """EHT MLD AP with MLD client being rejected on TX link, but then connecting on second link"""
840    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
841        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
842
843        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
844        wpas.interface_add(wpas_iface)
845
846        ssid = "mld_ap"
847        passphrase = 'qwertyuiop'
848        link_params = eht_mld_ap_wpa2_params(ssid, passphrase, mfp="2",
849                                             key_mgmt="SAE", pwe='2')
850        link_params['channel'] = '1'
851        link_params['bssid'] = '00:11:22:33:44:01'
852        hapd0 = eht_mld_enable_ap(hapd_iface, link_params)
853
854        link_params['channel'] = '6'
855        link_params['bssid'] = '00:11:22:33:44:02'
856        hapd1 = eht_mld_enable_ap(hapd_iface, link_params)
857
858        wpas.set("sae_pwe", "1")
859        with fail_test(hapd0, 1, "hostapd_get_aid"):
860            wpas.connect(ssid, sae_password=passphrase, ieee80211w="2",
861                         key_mgmt="SAE", scan_freq="2412")
862
863        eht_verify_status(wpas, hapd1, 2437, 20, is_ht=True, mld=True,
864                          valid_links=2, active_links=2)
865        traffic_test(wpas, hapd0)
866        traffic_test(wpas, hapd1)
867
868def test_eht_all_links_rejected(dev, apdev, params):
869    """EHT MLD AP with MLD client ignores all rejected links"""
870    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
871        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
872
873        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
874        wpas.interface_add(wpas_iface)
875
876        ssid = "mld_ap"
877        passphrase = 'qwertyuiop'
878        link_params = eht_mld_ap_wpa2_params(ssid, passphrase, mfp="2",
879                                             key_mgmt="SAE", pwe='2')
880        link_params['channel'] = '1'
881        link_params['bssid'] = '00:11:22:33:44:01'
882        hapd0 = eht_mld_enable_ap(hapd_iface, link_params)
883
884        link_params['channel'] = '6'
885        link_params['bssid'] = '00:11:22:33:44:02'
886        hapd1 = eht_mld_enable_ap(hapd_iface, link_params)
887        wpas.set("mld_connect_bssid_pref", "00:11:22:33:44:01")
888        wpas.set("sae_pwe", "1")
889
890        with fail_test(hapd0, 1, "hostapd_get_aid",
891                       1, "hostapd_process_assoc_ml_info"):
892            wpas.connect(ssid, sae_password=passphrase, ieee80211w="2",
893                         key_mgmt="SAE", scan_freq="2412", wait_connect=False)
894            ev = wpas.wait_event(['CTRL-EVENT-ASSOC-REJECT'])
895            if not ev:
896                raise Exception('Rejection not found')
897
898            ev1 = wpas.wait_event(['Added BSSID'])
899            ev2 = wpas.wait_event(['Added BSSID'])
900            if (not ev1 or not ev2) or \
901                not ((hapd0.own_addr() in ev1 and hapd1.own_addr() in ev2) or
902                     (hapd1.own_addr() in ev1 and hapd0.own_addr() in ev2)):
903                raise Exception('Not all BSSs were added to the ignore list')
904
905            # After this message, a new scan clears the ignore and the STA
906            # connects.
907            wpas.wait_connected(timeout=15)
908
909def test_eht_connect_invalid_link(dev, apdev, params):
910    """EHT MLD AP where one link is incorrectly configured and rejected by mac80211"""
911    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
912        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
913
914        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
915        wpas.interface_add(wpas_iface)
916
917        ssid = "mld_ap"
918        passphrase = 'qwertyuiop'
919        ssid = "mld_ap"
920        passphrase = 'qwertyuiop'
921        link_params = eht_mld_ap_wpa2_params(ssid, passphrase, mfp="2",
922                                             key_mgmt="SAE", pwe='2')
923        link_params['channel'] = '1'
924        link_params['bssid'] = '00:11:22:33:44:01'
925        hapd0 = eht_mld_enable_ap(hapd_iface, link_params)
926
927        link_params['channel'] = '6'
928        link_params['bssid'] = '00:11:22:33:44:02'
929        hapd1 = eht_mld_enable_ap(hapd_iface, link_params)
930
931        # We scan for both APs, then try to connect to link 0, but only the
932        # second attempt will work if mac80211 rejects the second link.
933        wpas.set("mld_connect_bssid_pref", "00:11:22:33:44:01")
934        wpas.set("sae_pwe", "1")
935        with fail_test(wpas, 1, "assoc;wpa_driver_nl80211_associate",
936                             2, "link;wpa_driver_nl80211_associate"):
937            wpas.connect(ssid, sae_password=passphrase, ieee80211w="2",
938                         key_mgmt="SAE", scan_freq="2412")
939
940        eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True,
941                          valid_links=1, active_links=1)
942
943        out = run_tshark(os.path.join(params['logdir'], 'hwsim0.pcapng'),
944                         'wlan.fc.type_subtype == 0x0000 && wlan.ext_tag.data == 00:01:09:%s:00:00' % wpas.own_addr(),
945                         display=['frame.number'])
946        if not out.splitlines():
947            raise Exception('Association request send by mac80211 had unexpected ML element content (probably it contained a second link)')
948
949def test_eht_mld_link_removal(dev, apdev):
950    """EHT MLD with two links. Links removed during association"""
951
952    with HWSimRadio(use_mlo=True) as (hapd0_radio, hapd0_iface), \
953        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
954
955        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
956        wpas.interface_add(wpas_iface)
957
958        ssid = "mld_ap_owe_two_link"
959        params = eht_mld_ap_wpa2_params(ssid, key_mgmt="OWE", mfp="2")
960        hapd0 = eht_mld_enable_ap(hapd0_iface, params)
961
962        params['channel'] = '6'
963        hapd1 = eht_mld_enable_ap(hapd0_iface, params)
964
965        wpas.connect(ssid, scan_freq="2412 2437", key_mgmt="OWE",
966                     ieee80211w="2")
967        eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True,
968                          valid_links=3, active_links=3)
969        eht_verify_wifi_version(wpas)
970        traffic_test(wpas, hapd0)
971
972        logger.info("Disable the 2nd link in 4 beacon intervals")
973        hapd1.link_remove(4)
974        time.sleep(0.6)
975
976        logger.info("Test traffic after 2nd link disabled")
977        traffic_test(wpas, hapd0)
978
979        if "OK" not in hapd0.request("REKEY_GTK"):
980            raise Exception("REKEY_GTK failed")
981
982        ev = wpas.wait_event(["MLO RSN: Group rekeying completed"], timeout=2)
983        if ev is None:
984            raise Exception("GTK rekey timed out")
985
986        traffic_test(wpas, hapd0)
987
988        logger.info("Disable the 1st link in 20 beacon intervals")
989        hapd0.link_remove(20)
990        time.sleep(1)
991
992        logger.info("Verify that traffic is valid before the link is removed")
993        traffic_test(wpas, hapd0)
994        time.sleep(2)
995
996        logger.info("Test traffic after 1st link disabled")
997        traffic_test(wpas, hapd0, success=False)
998
999def test_eht_mld_bss_trans_mgmt_link_removal_imminent(dev, apdev):
1000    """EHT MLD with two links. BSS transition management with link removal imminent"""
1001
1002    with HWSimRadio(use_mlo=True) as (hapd0_radio, hapd0_iface), \
1003        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
1004
1005        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
1006        wpas.interface_add(wpas_iface)
1007
1008        ssid = "mld_ap_owe_two_link"
1009        params = eht_mld_ap_wpa2_params(ssid, key_mgmt="OWE", mfp="2")
1010        params["bss_transition"] = "1"
1011        params["mbo"] = "1"
1012
1013        hapd0 = eht_mld_enable_ap(hapd0_iface, params)
1014
1015        params['channel'] = '6'
1016
1017        hapd1 = eht_mld_enable_ap(hapd0_iface, params)
1018
1019        wpas.connect(ssid, scan_freq="2412 2437", key_mgmt="OWE",
1020                     ieee80211w="2")
1021        eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True,
1022                          valid_links=3, active_links=3)
1023        eht_verify_wifi_version(wpas)
1024        hapd0.wait_sta()
1025        hapd1.wait_sta()
1026        traffic_test(wpas, hapd0)
1027
1028        addr = wpas.own_addr()
1029        cmd = "BSS_TM_REQ " + addr + " disassoc_timer=3 disassoc_imminent=1 link_removal_imminent=1 bss_term=0,1"
1030        if "OK" not in hapd0.request(cmd):
1031            raise Exception("BSS_TM_REQ command failed")
1032
1033        # Only one link is terminate, so the STA is expected to remain
1034        # associated and not start a scan.
1035        ev = hapd0.wait_event(['BSS-TM-RESP'], timeout=5)
1036        # For now, allow this to pass without the BSS TM response since that
1037        # functionality with MLD needs a recent kernel change.
1038        #if ev is None:
1039        #    raise Exception("No BSS TM response received")
1040        if ev and "status_code=0" not in ev:
1041            raise Exception("Unexpected BSS TM response contents: " + ev)
1042
1043        ev = wpas.wait_event(["CTRL-EVENT-SCAN-STARTED",
1044                              "CTRL-EVENT-DISCONNECTED"], timeout=10)
1045        if ev is not None:
1046            raise Exception("Unexpected action on STA: " + ev)
1047
1048def send_check(hapd, frame, no_tx_status=False):
1049        cmd = "MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame="
1050        hapd.request(cmd + frame)
1051        if no_tx_status:
1052            return
1053        ev = hapd.wait_event(["MGMT-TX-STATUS"], timeout=1)
1054        if ev is None:
1055            raise Exception("No TX status")
1056
1057def test_eht_ap_mld_proto(dev, apdev):
1058    """AP MLD protocol testing"""
1059    with HWSimRadio(use_mlo=True) as (hapd0_radio, hapd0_iface), \
1060        HWSimRadio(use_mlo=True) as (hapd1_radio, hapd1_iface):
1061
1062        ssid = "mld_ap_owe_two_link"
1063        params = eht_mld_ap_wpa2_params(ssid, key_mgmt="OWE", mfp="2")
1064
1065        hapd0 = eht_mld_enable_ap(hapd0_iface, params)
1066
1067        params['channel'] = '6'
1068
1069        hapd1 = eht_mld_enable_ap(hapd0_iface, params)
1070
1071        ap_mld_addr = hapd0.get_status_field("mld_addr[0]").replace(':', '')
1072        bssid0 = hapd0.own_addr().replace(':', '')
1073        bssid1 = hapd1.own_addr().replace(':', '')
1074
1075        time.sleep(1)
1076        hapd0.set("ext_mgmt_frame_handling", "1")
1077        hapd1.set("ext_mgmt_frame_handling", "1")
1078
1079        # Truncated EML missing MLD Capabilities And operations field
1080        hapd0.note("Truncated EML missing MLD Capabilities And operations field")
1081        addr0 = "021122334400"
1082        addr1 = "021122334401"
1083        mld_addr = "02112233440f"
1084        hdr = "b0003a01" + bssid0 + addr0 + bssid0 + "1000"
1085        mle = "ff0a6b000007" + mld_addr
1086        auth = hdr + "0000" + "0100" + "0000" + mle
1087        send_check(hapd0, auth)
1088
1089        hdr = "00000000" + bssid0 + mld_addr + bssid0 + "1000"
1090        ssid = "00136d6c645f61705f6f77655f74776f5f6c696e6b"
1091        supp_rates = "010802040b160c121824"
1092        ext_supp_rates = "32043048606c"
1093        rsne = "301a0100000fac040100000fac040100000fac12cc000000000fac06"
1094        ht_capab = "2d1afe131bffff000000000000000000000100000000000000000000"
1095        ext_capab = "7f0a04004a02014000400001"
1096        he_capab = "ff16230178c81a400000bfce0000000000000000fafffaff"
1097        eht_capab = "ff126c07007c0000feffff7f0100888888880000"
1098        supp_op_classes = "3b155151525354737475767778797a7b7c7d7e7f808182"
1099        dh_param = "ff23201300ea85e693343a079500cf4d461011a0ff90ec4de1af40165adbea94a3f36eb071"
1100        wmm = "dd070050f202000100"
1101        assocreq_start = "3004" + "0500" + ssid + supp_rates + ext_supp_rates + rsne + ht_capab + ext_capab + he_capab
1102        assocreq_end = eht_capab + supp_op_classes + dh_param + wmm
1103
1104        # --> Not enough bytes for common info
1105        mle = "ff0a6b000109" + mld_addr
1106        send_check(hapd0, hdr + assocreq_start + mle + assocreq_end)
1107
1108        # Truncated Non-Inheritance element
1109        hapd0.note("Truncated Non-Inheritance element")
1110        addr0 = "021122334410"
1111        addr1 = "021122334411"
1112        mld_addr = "02112233441f"
1113        hdr = "b0003a01" + bssid0 + addr0 + bssid0 + "1000"
1114        mle = "ff0a6b000007" + mld_addr
1115        auth = hdr + "0000" + "0100" + "0000" + mle
1116        send_check(hapd0, auth)
1117
1118        # --> MLD: Invalid inheritance
1119        mle = "ff7d6b000109" + mld_addr + "0000"
1120        mle += "0067" + "3100" + "07" + addr1
1121        mle += "3004" + "010802040b160c121824" + "32043048606c" + "2d1afe131bffff000000000000000000000100000000000000000000" + "ff16230178c81a400000bfce0000000000000000fafffaff" + "ff126c07007c0000feffff7f0100888888880000"
1122        # Non-Inhericance element
1123        mle += "ff023800"
1124        # Unknown optional subelement
1125        mle += "aa00"
1126        # Vendor-Specific subelement
1127        mle += "dd0411223344"
1128        hdr = "00000000" + bssid0 + mld_addr + bssid0 + "1000"
1129        send_check(hapd0, hdr + assocreq_start + mle + assocreq_end,
1130                   no_tx_status=True)
1131
1132        # Empty Non-Inheritance element
1133        hapd0.note("Empty Non-Inheritance element")
1134        addr0 = "021122334420"
1135        addr1 = "021122334421"
1136        mld_addr = "02112233442f"
1137        hdr = "b0003a01" + bssid0 + addr0 + bssid0 + "1000"
1138        mle = "ff0a6b000007" + mld_addr
1139        auth = hdr + "0000" + "0100" + "0000" + mle
1140        send_check(hapd0, auth)
1141
1142        mle = "ff7e6b000109" + mld_addr + "0000"
1143        mle += "0068" + "3100" + "07" + addr1
1144        mle += "3004" + "010802040b160c121824" + "32043048606c" + "2d1afe131bffff000000000000000000000100000000000000000000" + "ff16230178c81a400000bfce0000000000000000fafffaff" + "ff126c07007c0000feffff7f0100888888880000"
1145        # Non-Inhericance element
1146        mle += "ff03380000"
1147        # Unknown optional subelement
1148        mle += "aa00"
1149        # Vendor-Specific subelement
1150        mle += "dd0411223344"
1151        hdr = "00000000" + bssid0 + mld_addr + bssid0 + "1000"
1152        send_check(hapd0, hdr + assocreq_start + mle + assocreq_end)
1153
1154        # Non-Inheritance element
1155        hapd0.note("Non-Inheritance element")
1156        addr0 = "021122334430"
1157        addr1 = "021122334431"
1158        mld_addr = "02112233443f"
1159        hdr = "b0003a01" + bssid0 + addr0 + bssid0 + "1000"
1160        mle = "ff0a6b000007" + mld_addr
1161        auth = hdr + "0000" + "0100" + "0000" + mle
1162        send_check(hapd0, auth)
1163
1164        mle = "ff9e6b000109" + mld_addr + "0000"
1165        mle += "0088" + "3100" + "07" + addr1
1166        mle += "3004" + "010802040b160c121824" + "32043048606c" + "2d1afe131bffff000000000000000000000100000000000000000000" + "ff16230178c81a400000bfce0000000000000000fafffaff" + "ff126c07007c0000feffff7f0100888888880000"
1167        # Non-Inhericance element
1168        mle += "ff2338" + "1010032a362137387172756b548bedeff0" + "106b01020304050607080c0d21643b3a36"
1169        # Unknown optional subelement
1170        mle += "aa00"
1171        # Vendor-Specific subelement
1172        mle += "dd0411223344"
1173        hdr = "00000000" + bssid0 + mld_addr + bssid0 + "1000"
1174        send_check(hapd0, hdr + assocreq_start + mle + assocreq_end)
1175
1176        # No Non-Inheritance element
1177        hapd0.note("No Non-Inheritance element")
1178        addr0 = "021122334440"
1179        addr1 = "021122334441"
1180        mld_addr = "02112233444f"
1181        hdr = "b0003a01" + bssid0 + addr0 + bssid0 + "1000"
1182        mle = "ff0a6b000007" + mld_addr
1183        auth = hdr + "0000" + "0100" + "0000" + mle
1184        send_check(hapd0, auth)
1185
1186        mle = "ff716b000109" + mld_addr + "0000"
1187        mle += "0063" + "3100" + "07" + addr1
1188        mle += "3004010802040b160c12182432043048606c2d1afe131bffff000000000000000000000100000000000000000000ff16230178c81a400000bfce0000000000000000fafffaffff126c07007c0000feffff7f0100888888880000"
1189        hdr = "00000000" + bssid0 + mld_addr + bssid0 + "1000"
1190        send_check(hapd0, hdr + assocreq_start + mle + assocreq_end)
1191
1192def _5ghz_chanwidth_to_bw(op):
1193    return {
1194        0: "40",
1195        1: "80",
1196        2: "160",
1197        3: "80+80",
1198    }.get(op, "20")
1199
1200def _test_eht_5ghz(dev, apdev, channel, chanwidth, ccfs1, ccfs2=0,
1201                   eht_oper_puncturing_override=None,
1202                   he_ccfs1=None, he_oper_chanwidth=None):
1203    if he_ccfs1 is None:
1204        he_ccfs1 = ccfs1
1205    if he_oper_chanwidth is None:
1206        he_oper_chanwidth = chanwidth
1207
1208    try:
1209        params = {"ssid": "eht",
1210                  "country_code": "US",
1211                  "hw_mode": "a",
1212                  "channel": str(channel),
1213                  "ieee80211n": "1",
1214                  "ieee80211ac": "1",
1215                  "ieee80211ax": "1",
1216                  "ieee80211be": "1",
1217                  "vht_oper_chwidth": str(he_oper_chanwidth),
1218                  "vht_oper_centr_freq_seg0_idx": str(he_ccfs1),
1219                  "vht_oper_centr_freq_seg1_idx": str(ccfs2),
1220                  "he_oper_chwidth": str(he_oper_chanwidth),
1221                  "he_oper_centr_freq_seg0_idx": str(he_ccfs1),
1222                  "he_oper_centr_freq_seg1_idx": str(ccfs2),
1223                  "eht_oper_centr_freq_seg0_idx": str(ccfs1),
1224                  "eht_oper_chwidth": str(chanwidth)}
1225
1226        if he_oper_chanwidth == 0:
1227            if channel < he_ccfs1:
1228                  params["ht_capab"] = "[HT40+]"
1229            elif channel > he_ccfs1:
1230                  params["ht_capab"] = "[HT40-]"
1231        else:
1232            params["ht_capab"] = "[HT40+]"
1233            if he_oper_chanwidth == 2:
1234                params["vht_capab"] = "[VHT160]"
1235            elif he_oper_chanwidth == 3:
1236                params["vht_capab"] = "[VHT160-80PLUS80]"
1237
1238        if eht_oper_puncturing_override:
1239            params['eht_oper_puncturing_override'] = eht_oper_puncturing_override
1240
1241        freq = 5000 + channel * 5
1242        bw = "20"
1243        if chanwidth != 0 or channel != ccfs1:
1244            bw = _5ghz_chanwidth_to_bw(chanwidth)
1245
1246        hapd = hostapd.add_ap(apdev[0], params)
1247        dev[0].connect("eht", key_mgmt="NONE", scan_freq=str(freq))
1248        hapd.wait_sta()
1249
1250        eht_verify_status(dev[0], hapd, freq, bw, is_ht=True, is_vht=True)
1251        eht_verify_wifi_version(dev[0])
1252        hwsim_utils.test_connectivity(dev[0], hapd)
1253
1254        if eht_oper_puncturing_override:
1255            hapd.set("eht_oper_puncturing_override", "0x0")
1256            hapd.request("UPDATE_BEACON")
1257            time.sleep(1)
1258    finally:
1259        dev[0].request("DISCONNECT")
1260        dev[0].wait_disconnected()
1261        hapd.wait_sta_disconnect()
1262        set_world_reg(apdev[0], None, dev[0])
1263
1264def test_eht_5ghz_20mhz(dev, apdev):
1265    """EHT with 20 MHz channel width on 5 GHz"""
1266    _test_eht_5ghz(dev, apdev, 36, 0, 36, 0)
1267
1268def test_eht_5ghz_40mhz_low(dev, apdev):
1269    """EHT with 40 MHz channel width on 5 GHz - secondary channel above"""
1270    _test_eht_5ghz(dev, apdev, 36, 0, 38, 0)
1271
1272def test_eht_5ghz_40mhz_high(dev, apdev):
1273    """EHT with 80 MHz channel width on 5 GHz - secondary channel below"""
1274    _test_eht_5ghz(dev, apdev, 40, 0, 38, 0)
1275
1276def test_eht_5ghz_80mhz_1(dev, apdev):
1277    """EHT with 80 MHz channel width on 5 GHz - primary=149"""
1278    _test_eht_5ghz(dev, apdev, 36, 1, 42, 0)
1279
1280def test_eht_5ghz_80mhz_2(dev, apdev):
1281    """EHT with 80 MHz channel width on 5 GHz - primary=149"""
1282    _test_eht_5ghz(dev, apdev, 149, 1, 155, 0)
1283
1284def test_eht_5ghz_80mhz_puncturing_override_1(dev, apdev):
1285    """EHT with 80 MHz channel width on 5 GHz - primary=36 - puncturing override (2nd)"""
1286
1287    # The 2nd 20 MHz is punctured
1288    _test_eht_5ghz(dev, apdev, 36, 1, 42, 0,
1289                   eht_oper_puncturing_override="0x0002",
1290                   he_ccfs1=36, he_oper_chanwidth=0)
1291
1292def test_eht_5ghz_80mhz_puncturing_override_2(dev, apdev):
1293    """EHT with 80 MHz channel width on 5 GHz - primary=149 - puncturing override (3rd)"""
1294
1295    # The 3rd 20 MHz is punctured
1296    _test_eht_5ghz(dev, apdev, 149, 1, 155, 0,
1297                   eht_oper_puncturing_override="0x0004",
1298                   he_ccfs1=151, he_oper_chanwidth=0)
1299
1300def test_eht_5ghz_80mhz_puncturing_override_3(dev, apdev):
1301    """EHT with 80 MHz channel width on 5 GHz - primary=149 - puncturing override (4th)"""
1302
1303    # The 4th 20 MHz is punctured
1304    _test_eht_5ghz(dev, apdev, 149, 1, 155, 0,
1305                   eht_oper_puncturing_override="0x0008",
1306                   he_ccfs1=151, he_oper_chanwidth=0)
1307
1308def test_eht_5ghz_80p80mhz(dev, apdev):
1309    """EHT with 80+80 MHz channel width on 5 GHz"""
1310    _test_eht_5ghz(dev, apdev, 36, 3, 42, 155)
1311
1312def _6ghz_op_class_to_bw(op):
1313    return {
1314        131: "20",
1315        132: "40",
1316        133: "80",
1317        134: "160",
1318        137: "320",
1319    }.get(op, "20")
1320
1321def _test_eht_6ghz(dev, apdev, channel, op_class, ccfs1):
1322    check_sae_capab(dev[0])
1323
1324    # CA enables 320 MHz channels without NO-IR restriction
1325    dev[0].cmd_execute(['iw', 'reg', 'set', 'CA'])
1326    wait_regdom_changes(dev[0])
1327
1328    try:
1329        ssid = "eht_6ghz_sae"
1330        passphrase = "12345678"
1331        params = hostapd.he_wpa2_params(ssid=ssid, passphrase=passphrase)
1332        params["ieee80211be"] = "1"
1333        params["channel"] = str(channel)
1334        params["op_class"] = str(op_class)
1335        params["he_oper_centr_freq_seg0_idx"] = str(ccfs1)
1336        params["eht_oper_centr_freq_seg0_idx"] = str(ccfs1)
1337        params["country_code"] = "CA"
1338
1339        if not he_6ghz_supported():
1340            raise HwsimSkip("6 GHz frequency is not supported")
1341        if op_class == 137 and not eht_320mhz_supported():
1342            raise HwsimSkip("320 MHz channels are not supported")
1343
1344        hapd = hostapd.add_ap(apdev[0], params)
1345        status = hapd.get_status()
1346        logger.info("hostapd STATUS: " + str(status))
1347        if hapd.get_status_field("ieee80211ax") != "1":
1348            raise Exception("STATUS did not indicate ieee80211ax=1")
1349
1350        if hapd.get_status_field("ieee80211be") != "1":
1351            raise Exception("STATUS did not indicate ieee80211be=1")
1352
1353        dev[0].set("sae_pwe", "1")
1354
1355        freq = 5950 + channel * 5
1356        bw = _6ghz_op_class_to_bw(op_class)
1357
1358        dev[0].connect(ssid, key_mgmt="SAE", psk=passphrase, ieee80211w="2",
1359                       scan_freq=str(freq))
1360        hapd.wait_sta()
1361
1362        eht_verify_status(dev[0], hapd, freq, bw)
1363        eht_verify_wifi_version(dev[0])
1364        sta = hapd.get_sta(dev[0].own_addr())
1365        if 'supp_op_classes' not in sta:
1366            raise Exception("supp_op_classes not indicated")
1367        supp_op_classes = binascii.unhexlify(sta['supp_op_classes'])
1368        if op_class not in supp_op_classes:
1369            raise Exception("STA did not indicate support for opclass %d" % op_class)
1370        hwsim_utils.test_connectivity(dev[0], hapd)
1371        dev[0].request("DISCONNECT")
1372        dev[0].wait_disconnected()
1373        hapd.wait_sta_disconnect()
1374        hapd.disable()
1375    finally:
1376        dev[0].set("sae_pwe", "0")
1377        dev[0].cmd_execute(['iw', 'reg', 'set', '00'])
1378        wait_regdom_changes(dev[0])
1379
1380def test_eht_6ghz_20mhz(dev, apdev):
1381    """EHT with 20 MHz channel width on 6 GHz"""
1382    _test_eht_6ghz(dev, apdev, 5, 131, 5)
1383
1384def test_eht_6ghz_40mhz(dev, apdev):
1385    """EHT with 40 MHz channel width on 6 GHz"""
1386    _test_eht_6ghz(dev, apdev, 5, 132, 3)
1387
1388def test_eht_6ghz_80mhz(dev, apdev):
1389    """EHT with 80 MHz channel width on 6 GHz"""
1390    _test_eht_6ghz(dev, apdev, 5, 133, 7)
1391
1392def test_eht_6ghz_160mhz(dev, apdev):
1393    """EHT with 160 MHz channel width on 6 GHz"""
1394    _test_eht_6ghz(dev, apdev, 5, 134, 15)
1395
1396def test_eht_6ghz_320mhz(dev, apdev):
1397    """EHT with 320 MHz channel width on 6 GHz"""
1398    _test_eht_6ghz(dev, apdev, 5, 137, 31)
1399
1400def test_eht_6ghz_320mhz_2(dev, apdev):
1401    """EHT with 320 MHz channel width on 6 GHz center 63"""
1402    _test_eht_6ghz(dev, apdev, 37, 137, 63)
1403
1404def test_eht_6ghz_320mhz_3(dev, apdev):
1405    """EHT with 320 MHz channel width on 6 GHz center 31 primary 37"""
1406    _test_eht_6ghz(dev, apdev, 37, 137, 31)
1407
1408def check_anqp(dev, bssid):
1409    if "OK" not in dev.request("ANQP_GET " + bssid + " 258"):
1410        raise Exception("ANQP_GET command failed")
1411
1412    ev = dev.wait_event(["GAS-QUERY-START"], timeout=5)
1413    if ev is None:
1414        raise Exception("GAS query start timed out")
1415
1416    ev = dev.wait_event(["GAS-QUERY-DONE"], timeout=10)
1417    if ev is None:
1418        raise Exception("GAS query timed out")
1419
1420    ev = dev.wait_event(["RX-ANQP"], timeout=1)
1421    if ev is None or "Venue Name" not in ev:
1422        raise Exception("Did not receive Venue Name")
1423
1424    ev = dev.wait_event(["ANQP-QUERY-DONE"], timeout=10)
1425    if ev is None:
1426        raise Exception("ANQP-QUERY-DONE event not seen")
1427    if "result=SUCCESS" not in ev:
1428        raise Exception("Unexpected result: " + ev)
1429
1430def test_eht_mld_gas(dev, apdev):
1431    """GAS/ANQP during MLO association"""
1432    params = hs20_ap_params()
1433    bssid = apdev[0]['bssid']
1434    params['hessid'] = bssid
1435    params['channel'] = "11"
1436    hapd = hostapd.add_ap(apdev[0], params)
1437
1438    with HWSimRadio(use_mlo=True) as (hapd0_radio, hapd0_iface), \
1439        HWSimRadio(use_mlo=True) as (hapd1_radio, hapd1_iface), \
1440        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
1441
1442        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
1443        wpas.interface_add(wpas_iface)
1444        wpas.scan_for_bss(bssid, freq="2462")
1445
1446        ssid = "owe_two_link"
1447        params = eht_mld_ap_wpa2_params(ssid, key_mgmt="OWE", mfp="2")
1448        params['interworking'] = "1"
1449        params['venue_group'] = "7"
1450        params['venue_type'] = "1"
1451        params['venue_name'] = "eng:Example venue"
1452        hapd0 = eht_mld_enable_ap(hapd0_iface, params)
1453        bssid0 = hapd0.own_addr()
1454
1455        params['channel'] = '6'
1456        hapd1 = eht_mld_enable_ap(hapd0_iface, params)
1457        bssid1 = hapd1.own_addr()
1458
1459        wpas.scan_for_bss(bssid0, freq="2412")
1460        wpas.scan_for_bss(bssid1, freq="2437")
1461
1462        wpas.connect(ssid, scan_freq="2412 2437", key_mgmt="OWE",
1463                     ieee80211w="2")
1464        hapd0.wait_sta()
1465        hapd1.wait_sta()
1466
1467        eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True,
1468                          valid_links=3, active_links=3)
1469
1470        check_anqp(wpas, bssid)
1471        check_anqp(wpas, bssid0)
1472        check_anqp(wpas, bssid1)
1473
1474def test_eht_mld_dpp_responder_while_assoc(dev, apdev):
1475    """DPP responder while ML associated"""
1476    check_dpp_capab(dev[0])
1477
1478    with HWSimRadio(use_mlo=True) as (hapd0_radio, hapd0_iface), \
1479        HWSimRadio(use_mlo=True) as (hapd1_radio, hapd1_iface), \
1480        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
1481
1482        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
1483        wpas.interface_add(wpas_iface)
1484        check_dpp_capab(wpas)
1485
1486        ssid = "owe_two_link"
1487        params = eht_mld_ap_wpa2_params(ssid, key_mgmt="OWE", mfp="2")
1488        hapd0 = eht_mld_enable_ap(hapd0_iface, params)
1489
1490        params['channel'] = '6'
1491        hapd1 = eht_mld_enable_ap(hapd0_iface, params)
1492
1493        wpas.connect(ssid, scan_freq="2412 2437", key_mgmt="OWE",
1494                     ieee80211w="2")
1495        hapd0.wait_sta()
1496        hapd1.wait_sta()
1497
1498        eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True,
1499                          valid_links=3, active_links=3)
1500
1501        id = wpas.dpp_bootstrap_gen(chan="81/11", mac=True)
1502        uri = wpas.request("DPP_BOOTSTRAP_GET_URI %d" % id)
1503        wpas.dpp_listen(2462)
1504        dev[0].dpp_auth_init(uri=uri)
1505        wait_auth_success(dev[0], wpas)
1506
1507def _eht_mld_disconnect(dev, apdev, disassoc=True):
1508    with HWSimRadio(use_mlo=True) as (hapd0_radio, hapd0_iface), \
1509        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
1510
1511        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
1512        wpas.interface_add(wpas_iface)
1513
1514        ssid = "mld_ap_owe_two_link"
1515        params = eht_mld_ap_wpa2_params(ssid, key_mgmt="OWE", mfp="2")
1516        hapd0 = eht_mld_enable_ap(hapd0_iface, params)
1517
1518        params['channel'] = '6'
1519        hapd1 = eht_mld_enable_ap(hapd0_iface, params)
1520
1521        wpas.connect(ssid, scan_freq="2412 2437", key_mgmt="OWE",
1522                     ieee80211w="2")
1523        hapd0.wait_sta()
1524        hapd1.wait_sta()
1525        eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True,
1526                          valid_links=3, active_links=3)
1527        eht_verify_wifi_version(wpas)
1528        traffic_test(wpas, hapd0)
1529
1530        cmd = "DISASSOCIATE " if disassoc else "DEAUTHENTICATE "
1531
1532        cmd += wpas.own_addr()
1533        for i in range(0, 3):
1534            time.sleep(1)
1535
1536            if "OK" not in hapd0.request(cmd):
1537                raise Exception("Failed to request: " + cmd)
1538            hapd0.wait_sta_disconnect()
1539            hapd1.wait_sta_disconnect()
1540
1541            wpas.wait_disconnected(timeout=1)
1542            wpas.wait_connected(timeout=5)
1543            hapd0.wait_sta()
1544            hapd1.wait_sta()
1545
1546            eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True,
1547                              valid_links=3, active_links=3)
1548            eht_verify_wifi_version(wpas)
1549            traffic_test(wpas, hapd0)
1550
1551def test_eht_mld_disassociate(dev, apdev):
1552    """EHT MLD with two links. Disassociate and reconnect"""
1553    _eht_mld_disconnect(dev, apdev, disassoc=True)
1554
1555def test_eht_mld_deauthenticate(dev, apdev):
1556    """EHT MLD with two links. Deauthenticate and reconnect"""
1557    _eht_mld_disconnect(dev, apdev, disassoc=False)
1558
1559def test_eht_mld_non_pref_chan(dev, apdev):
1560    """EHT MLD with one link. MBO non preferred channels"""
1561
1562    with HWSimRadio(use_mlo=True) as (hapd0_radio, hapd0_iface), \
1563        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
1564
1565        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
1566        wpas.interface_add(wpas_iface)
1567
1568        # Start the first AP
1569        ssid = "mld_ap_one_link_mbo"
1570        params = eht_mld_ap_wpa2_params(ssid, key_mgmt="OWE", mfp="2")
1571        params["bss_transition"] = "1"
1572        params["mbo"] = "1"
1573
1574        hapd0 = eht_mld_enable_ap(hapd0_iface, params)
1575
1576        if "OK" not in wpas.request("SET non_pref_chan 81:7:200:1 81:9:100:2"):
1577            raise Exception("Failed to set non-preferred channel list")
1578
1579        id = wpas.connect(ssid, scan_freq="2412", key_mgmt="OWE",
1580                          ieee80211w="2", owe_only="1")
1581        hapd0.wait_sta()
1582        eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True,
1583                          valid_links=1, active_links=1)
1584        eht_verify_wifi_version(wpas)
1585        traffic_test(wpas, hapd0)
1586
1587        # Validate information received from the Association Request frame
1588        addr = wpas.own_addr()
1589        sta = hapd0.get_sta(addr)
1590        logger.debug("STA: " + str(sta))
1591
1592        if 'non_pref_chan[0]' not in sta:
1593            raise Exception("Missing non_pref_chan[0] value (assoc)")
1594        if sta['non_pref_chan[0]'] != '81:200:1:7':
1595            raise Exception("Unexpected non_pref_chan[0] value (assoc)")
1596        if 'non_pref_chan[1]' not in sta:
1597            raise Exception("Missing non_pref_chan[1] value (assoc)")
1598        if sta['non_pref_chan[1]'] != '81:100:2:9':
1599            raise Exception("Unexpected non_pref_chan[1] value (assoc)")
1600        if 'non_pref_chan[2]' in sta:
1601            raise Exception("Unexpected non_pref_chan[2] value (assoc)")
1602
1603        # Verify operating class
1604        if 'supp_op_classes' not in sta:
1605            raise Exception("No supp_op_classes")
1606        supp = bytearray(binascii.unhexlify(sta['supp_op_classes']))
1607        if supp[0] != 81:
1608            raise Exception("Unexpected current operating class %d" % supp[0])
1609        if 115 not in supp:
1610            raise Exception("Operating class 115 missing")
1611
1612        # Validate information from WNM action
1613        if "OK" not in wpas.request("SET non_pref_chan 81:9:100:2"):
1614            raise Exception("Failed to update non-preferred channel list")
1615
1616        time.sleep(0.1)
1617        sta = hapd0.get_sta(addr)
1618        logger.debug("STA: " + str(sta))
1619
1620        if 'non_pref_chan[0]' not in sta:
1621            raise Exception("Missing non_pref_chan[0] value (update 1)")
1622        if sta['non_pref_chan[0]'] != '81:100:2:9':
1623            raise Exception("Unexpected non_pref_chan[0] value (update 1)")
1624        if 'non_pref_chan[1]' in sta:
1625            raise Exception("Unexpected non_pref_chan[1] value (update 1)")
1626
1627        # Validate information from WNM action with multiple entries
1628        if "OK" not in wpas.request("SET non_pref_chan 81:9:100:2 81:10:100:2 81:8:100:2 81:7:100:1 81:5:100:1"):
1629            raise Exception("Failed to update non-preferred channel list")
1630        time.sleep(0.1)
1631        sta = hapd0.get_sta(addr)
1632        logger.debug("STA: " + str(sta))
1633
1634        if 'non_pref_chan[0]' not in sta:
1635            raise Exception("Missing non_pref_chan[0] value (update 2)")
1636        if sta['non_pref_chan[0]'] != '81:100:1:7,5':
1637            raise Exception("Unexpected non_pref_chan[0] value (update 2)")
1638        if 'non_pref_chan[1]' not in sta:
1639            raise Exception("Missing non_pref_chan[1] value (update 2)")
1640        if sta['non_pref_chan[1]'] != '81:100:2:9,10,8':
1641            raise Exception("Unexpected non_pref_chan[1] value (update 2)")
1642        if 'non_pref_chan[2]' in sta:
1643            raise Exception("Unexpected non_pref_chan[2] value (update 2)")
1644
1645def test_eht_mld_rrm_beacon_req(dev, apdev):
1646    """EHT MLD with one link. RRM beacon request"""
1647
1648    with HWSimRadio(use_mlo=True) as (hapd0_radio, hapd0_iface), \
1649        HWSimRadio(use_mlo=True) as (hapd1_radio, hapd1_iface), \
1650        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
1651
1652        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
1653        wpas.interface_add(wpas_iface)
1654
1655        # Start the first AP and connect
1656        ssid = "mld_ap_one_link_rrm1"
1657        params = eht_mld_ap_wpa2_params(ssid, key_mgmt="OWE", mfp="2")
1658        params["bss_transition"] = "1"
1659        params["mbo"] = "1"
1660        params["rrm_beacon_report"] = "1"
1661
1662        hapd0 = eht_mld_enable_ap(hapd0_iface, params)
1663
1664        wpas.connect(ssid, scan_freq="2412", key_mgmt="OWE", ieee80211w="2",
1665                     owe_only="1")
1666        hapd0.wait_sta()
1667        eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True,
1668                          valid_links=1, active_links=1)
1669        eht_verify_wifi_version(wpas)
1670        traffic_test(wpas, hapd0)
1671
1672        # Start the second AP
1673        other_ssid = "other"
1674        params = eht_mld_ap_wpa2_params(other_ssid, key_mgmt="OWE", mfp="2")
1675        params["channel"] = '6'
1676        hapd1 = eht_mld_enable_ap(hapd1_iface, params)
1677
1678        # Issue a beacon request for the second AP
1679        addr = wpas.own_addr()
1680        req = build_beacon_request(mode=1, chan=6, duration=50)
1681
1682        # Send the request with SSID, Detail, Last Beacon Report Indication, and
1683        # Extended Request subelements. The Extended Request elements includes
1684        # the Multi-Link element ID.
1685        run_req_beacon(hapd0, addr,
1686                       req + "0000" + "020101" + "a40101" + "0b02ff6b")
1687
1688        ev = hapd0.wait_event(["BEACON-RESP-RX"], timeout=3)
1689        if ev is None:
1690            raise Exception("Beacon report response not received")
1691
1692        fields = ev.split(' ')
1693        report = BeaconReport(binascii.unhexlify(fields[4]))
1694        logger.info("Received beacon report: " + str(report))
1695        if report.bssid_str != hapd1.own_addr() or report.opclass != 81 or \
1696           report.channel != 6:
1697            raise Exception("Incorrect bssid/op class/channel for hapd1")
1698
1699        if not report.last_indication:
1700            raise Exception("Last Beacon Report Indication subelement missing")
1701
1702def test_eht_mld_legacy_stas(dev, apdev):
1703    """EHT AP MLD and multiple non-MLD STAs"""
1704    for i in range(3):
1705        check_sae_capab(dev[i])
1706
1707    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface):
1708        password = 'qwertyuiop'
1709        ssid = "ap_mld_sae"
1710        params = eht_mld_ap_wpa2_params(ssid, password,
1711                                        key_mgmt="SAE SAE-EXT-KEY",
1712                                        mfp="2", pwe='2')
1713        params['rsn_pairwise'] = "CCMP GCMP-256"
1714        params['sae_groups'] = "19 20"
1715        hapd0 = eht_mld_enable_ap(hapd_iface, params)
1716
1717        for i in range(3):
1718            dev[i].set("sae_groups", "")
1719            dev[i].connect(ssid, sae_password=password, scan_freq="2412",
1720                           key_mgmt="SAE", ieee80211w="2", disable_eht="1")
1721        hapd0.wait_sta()
1722        hapd0.wait_sta()
1723        hapd0.wait_sta()
1724        aid = []
1725        for i in range(3):
1726            aid.append(int(hapd0.get_sta(dev[i].own_addr())['aid']))
1727            traffic_test(dev[i], hapd0)
1728        logger.info("Assigned AIDs: " + str(aid))
1729        if len(set(aid)) != 3:
1730            raise Exception("AP did not assign unique AID to each STA")
1731
1732def test_eht_mld_and_mlds(dev, apdev):
1733    """EHT AP MLD and multiple non-AP MLDs"""
1734    check_sae_capab(dev[0])
1735
1736    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
1737            HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface), \
1738            HWSimRadio(use_mlo=True) as (wpas_radio2, wpas_iface2):
1739        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
1740        wpas.interface_add(wpas_iface)
1741        check_sae_capab(wpas)
1742
1743        wpas2 = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
1744        wpas2.interface_add(wpas_iface2)
1745        check_sae_capab(wpas2)
1746
1747        password = 'qwertyuiop'
1748        ssid = "ap_mld_sae"
1749        params = eht_mld_ap_wpa2_params(ssid, password,
1750                                        key_mgmt="SAE SAE-EXT-KEY",
1751                                        mfp="2", pwe='2')
1752        params['rsn_pairwise'] = "CCMP GCMP-256"
1753        params['sae_groups'] = "19 20"
1754        hapd0 = eht_mld_enable_ap(hapd_iface, params)
1755
1756        wpas.set("sae_pwe", "1")
1757        wpas.connect(ssid, sae_password=password, scan_freq="2412",
1758                     key_mgmt="SAE-EXT-KEY", ieee80211w="2")
1759        wpas2.set("sae_pwe", "1")
1760        wpas2.connect(ssid, sae_password=password, scan_freq="2412",
1761                      key_mgmt="SAE-EXT-KEY", ieee80211w="2")
1762
1763        hapd0.wait_sta()
1764        hapd0.wait_sta()
1765        aid = []
1766        aid.append(int(hapd0.get_sta(wpas.own_addr())['aid']))
1767        traffic_test(wpas, hapd0)
1768        aid.append(int(hapd0.get_sta(wpas2.own_addr())['aid']))
1769        traffic_test(wpas2, hapd0)
1770        logger.info("Assigned AIDs: " + str(aid))
1771        if len(set(aid)) != 2:
1772            raise Exception("AP MLD did not assign unique AID to each non-AP MLD")
1773
1774def mlo_perform_csa(hapd, command, freq, dev):
1775        match_str = "freq=" + str(freq)
1776        hapd.request(command)
1777
1778        ev = hapd.wait_event(["CTRL-EVENT-STARTED-CHANNEL-SWITCH"], timeout=10)
1779        if ev is None:
1780            raise Exception("Channel switch start event not seen")
1781        if match_str not in ev:
1782            raise Exception("Unexpected channel in CS started")
1783
1784        ev = hapd.wait_event(["CTRL-EVENT-CHANNEL-SWITCH"], timeout=10)
1785        if ev is None:
1786            raise Exception("Channel switch completion event not seen")
1787        if match_str not in ev:
1788            raise Exception("Unexpected channel in CS completed")
1789
1790        ev = hapd.wait_event(["AP-CSA-FINISHED"], timeout=10)
1791        if ev is None:
1792            raise Exception("CSA finished event timed out")
1793        if match_str not in ev:
1794            raise Exception("Unexpected channel in CSA finished event")
1795
1796        ev = dev.wait_event(["CTRL-EVENT-LINK-CHANNEL-SWITCH"], timeout=10)
1797        if ev is None:
1798            raise Exception("Non-AP MLD did not report CS")
1799        if match_str not in ev:
1800            raise Exception("Unexpected channel in CS event from non-AP MLD")
1801
1802        time.sleep(0.5)
1803
1804def test_eht_mlo_csa(dev, apdev):
1805        """EHT MLD AP connected to non-AP MLD. Seamless channel switch"""
1806        csa_supported(dev[0])
1807
1808        with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
1809            HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
1810
1811            wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
1812            wpas.interface_add(wpas_iface)
1813
1814            ssid = "mld_ap"
1815            passphrase = 'qwertyuiop'
1816
1817            params = eht_mld_ap_wpa2_params(ssid, passphrase,
1818                                            key_mgmt="SAE", mfp="2", pwe='1')
1819            hapd0 = eht_mld_enable_ap(hapd_iface, params)
1820
1821            params['channel'] = '6'
1822            hapd1 = eht_mld_enable_ap(hapd_iface, params)
1823
1824            wpas.set("sae_pwe", "1")
1825            wpas.connect(ssid, sae_password=passphrase, scan_freq="2412 2437",
1826                         key_mgmt="SAE", ieee80211w="2")
1827
1828            eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True,
1829                              valid_links=3, active_links=3)
1830            eht_verify_wifi_version(wpas)
1831            traffic_test(wpas, hapd0)
1832
1833            logger.info("Perform CSA on 1st link")
1834            mlo_perform_csa(hapd0, "CHAN_SWITCH 5 2462 ht he eht blocktx",
1835                            2462, wpas)
1836
1837            logger.info("Test traffic after 1st link CSA completes")
1838            traffic_test(wpas, hapd0)
1839
1840            logger.info("Perform CSA on 1st link and bring it back to original channel")
1841            mlo_perform_csa(hapd0, "CHAN_SWITCH 5 2412 ht he eht blocktx",
1842                            2412, wpas)
1843
1844            logger.info("Test traffic again after 1st link CSA completes")
1845            traffic_test(wpas, hapd0)
1846
1847            #TODO: CSA on non-first link
1848
1849def create_base_conf_file(iface, channel, prefix='hostapd-', hw_mode='g',
1850                          op_class=None):
1851    # Create configuration file and add phy characteristics
1852    fd, fname = tempfile.mkstemp(dir='/tmp',
1853                                 prefix=prefix + iface + "-chan-" + str(channel) + "-")
1854    f = os.fdopen(fd, 'w')
1855
1856    f.write("driver=nl80211\n")
1857    f.write("hw_mode=" + str(hw_mode) + "\n")
1858    f.write("ieee80211n=1\n")
1859    if hw_mode == 'a' and \
1860       (op_class is None or \
1861        op_class not in [131, 132, 133, 134, 135, 136, 137]):
1862        f.write("ieee80211ac=1\n")
1863    f.write("ieee80211ax=1\n")
1864    f.write("ieee80211be=1\n")
1865    f.write("channel=" + str(channel) + "\n")
1866
1867    return f, fname
1868
1869def append_bss_conf_to_file(f, ifname, params, first=False):
1870    # Add BSS specific characteristics
1871    config = "bss"
1872
1873    if first:
1874        config = "interface"
1875
1876    f.write("\n" + config + "=%s\n" % ifname)
1877
1878    for k, v in list(params.items()):
1879        f.write("{}={}\n".format(k,v))
1880
1881    f.write("mld_ap=1\n")
1882
1883def dump_config(fname):
1884    with open(fname, 'r') as f:
1885        cfg = f.read()
1886        logger.debug("hostapd config: " + str(fname) + "\n" + cfg)
1887
1888def get_config(iface, count, ssid, passphrase, channel, bssid_regex,
1889               rnr=False, debug=False):
1890    f, fname = create_base_conf_file(iface, channel=channel)
1891    hapds = []
1892
1893    for i in range(count):
1894        if i == 0:
1895            ifname = iface
1896        else:
1897            ifname = iface + "-" + str(i)
1898
1899        set_ssid = ssid + str(i)
1900        set_passphrase = passphrase + str(i)
1901        params = hostapd.wpa2_params(ssid=set_ssid, passphrase=set_passphrase,
1902                                     wpa_key_mgmt="SAE", ieee80211w="2")
1903        params['sae_pwe'] = "2"
1904        params['group_mgmt_cipher'] = "AES-128-CMAC"
1905        params['beacon_prot'] = "1"
1906        params["ctrl_interface"] = "/var/run/hostapd/chan_" + str(channel)
1907        params["bssid"] = bssid_regex % (i + 1)
1908
1909        if rnr:
1910            params["rnr"] = "1"
1911
1912        append_bss_conf_to_file(f, ifname, params, first=(i == 0))
1913
1914        hapds.append([ifname, params["ctrl_interface"], i])
1915
1916    f.close()
1917
1918    if debug:
1919        dump_config(fname)
1920
1921    return fname, hapds
1922
1923def start_ap(prefix, configs):
1924    pid = prefix + ".hostapd.pid"
1925    configs = configs.split()
1926
1927    cmd = ['../../hostapd/hostapd', '-ddKtB', '-P', pid, '-f',
1928           prefix + ".hostapd-log"]
1929
1930    cmd = cmd + configs
1931
1932    logger.info("Starting APs")
1933    res = subprocess.check_call(cmd)
1934    if res != 0:
1935        raise Exception("Could not start hostapd: %s" % str(res))
1936
1937    # Wait for hostapd to complete initialization and daemonize.
1938    time.sleep(2)
1939    for i in range(20):
1940        if os.path.exists(pid):
1941            break
1942        time.sleep(0.2)
1943
1944    if not os.path.exists(pid):
1945        raise Exception("hostapd did not create PID file.")
1946
1947def get_mld_devs(hapd_iface, count, prefix, rnr=False):
1948    fname1, hapds1 = get_config(hapd_iface, count=count, ssid="mld-",
1949                                passphrase="qwertyuiop-", channel=1,
1950                                bssid_regex="02:00:00:00:07:%02x",
1951                                rnr=rnr, debug=True)
1952    fname2, hapds2 = get_config(hapd_iface, count=count, ssid="mld-",
1953                                passphrase="qwertyuiop-", channel=6,
1954                                bssid_regex="02:00:00:00:08:%02x",
1955                                rnr=rnr, debug=True)
1956
1957    start_ap(prefix, fname1 + " " + fname2)
1958
1959    hapd_mld1_link0 = hostapd.Hostapd(ifname=hapds1[0][0], ctrl=hapds1[0][1],
1960                                      bssidx=hapds1[0][2])
1961    hapd_mld1_link1 = hostapd.Hostapd(ifname=hapds2[0][0], ctrl=hapds2[0][1],
1962                                      bssidx=hapds2[0][2])
1963
1964    hapd_mld2_link0 = hostapd.Hostapd(ifname=hapds1[1][0], ctrl=hapds1[1][1],
1965                                      bssidx=hapds1[1][2])
1966    hapd_mld2_link1 = hostapd.Hostapd(ifname=hapds2[1][0], ctrl=hapds2[1][1],
1967                                      bssidx=hapds2[1][2])
1968
1969    if not hapd_mld1_link0.ping():
1970        raise Exception("Could not ping hostapd")
1971
1972    if not hapd_mld1_link1.ping():
1973        raise Exception("Could not ping hostapd")
1974
1975    if not hapd_mld2_link0.ping():
1976        raise Exception("Could not ping hostapd")
1977
1978    if not hapd_mld2_link1.ping():
1979        raise Exception("Could not ping hostapd")
1980
1981    os.remove(fname1)
1982    os.remove(fname2)
1983
1984    return [hapd_mld1_link0, hapd_mld1_link1, hapd_mld2_link0, hapd_mld2_link1]
1985
1986def stop_mld_devs(hapds, pid):
1987    pid = pid + ".hostapd.pid"
1988
1989    if "OK" not in hapds[0].request("TERMINATE"):
1990        raise Exception("Failed to terminate hostapd process")
1991
1992    ev = hapds[0].wait_event(["CTRL-EVENT-TERMINATING"], timeout=15)
1993    if ev is None:
1994        raise Exception("CTRL-EVENT-TERMINATING not seen")
1995
1996    time.sleep(0.5)
1997
1998    for i in range(30):
1999        time.sleep(0.1)
2000        if not os.path.exists(pid):
2001            break
2002    if os.path.exists(pid):
2003        raise Exception("PID file exits after process termination")
2004
2005def eht_parse_rnr(bss, rnr=False, exp_bssid=None):
2006        partner_rnr_pattern = re.compile(".*ap_info.*, mld ID=0, link ID=",
2007                                         re.MULTILINE)
2008        ml_pattern = re.compile(".*multi-link:.*, MLD addr=.*", re.MULTILINE)
2009
2010        if partner_rnr_pattern.search(bss) is None:
2011            raise Exception("RNR element not found for first link of first MLD")
2012
2013        if ml_pattern.search(bss) is None:
2014            raise Exception("ML element not found for first link of first MLD")
2015
2016        if not rnr:
2017            return
2018
2019        coloc_rnr_pattern = re.compile(".*ap_info.*, mld ID=255, link ID=..",
2020                                       re.MULTILINE)
2021
2022        if coloc_rnr_pattern.search(bss) is None:
2023            raise Exception("RNR element not found for co-located BSS")
2024
2025        line = coloc_rnr_pattern.search(bss).group()
2026        if line.count('bssid') > 1:
2027            raise Exception("More than one BSS found for co-located RNR")
2028
2029        # Get the BSSID carried in the RNR
2030        index = line.rindex('bssid')
2031        bssid = line[index+len('bssid')+1:].split(',')[0]
2032
2033        # Get the MLD ID carried in the RNR
2034        index = line.rindex('link ID')
2035        link_id = line[index+len('link ID')+1:].split(',')[0]
2036
2037        if link_id != "15":
2038            raise Exception("Unexpected link ID for co-located BSS which is not own partner")
2039
2040        if bssid != exp_bssid:
2041            raise Exception("Unexpected BSSID for co-located BSS")
2042
2043def eht_mld_cohosted_discovery(dev, apdev, params, rnr=False):
2044    with HWSimRadio(use_mlo=True, n_channels=2) as (hapd_radio, hapd_iface), \
2045        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
2046
2047        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
2048        wpas.interface_add(wpas_iface)
2049
2050        hapds = get_mld_devs(hapd_iface=hapd_iface, count=2,
2051                             prefix=params['prefix'], rnr=rnr)
2052
2053        # Only scan link 0
2054        res = wpas.request("SCAN freq=2412")
2055        if "FAIL" in res:
2056            raise Exception("Failed to start scan")
2057
2058        ev = wpas.wait_event(["CTRL-EVENT-SCAN-STARTED"])
2059        if ev is None:
2060            raise Exception("Scan did not start")
2061
2062        ev = wpas.wait_event(["CTRL-EVENT-SCAN-RESULTS"])
2063        if ev is None:
2064            raise Exception("Scan did not complete")
2065
2066        logger.info("Scan done")
2067
2068        bss = wpas.request("BSS " + hapds[0].own_addr())
2069        logger.info("BSS 0_0: " + str(bss))
2070        eht_parse_rnr(bss, rnr, hapds[2].own_addr())
2071
2072        bss = wpas.request("BSS " + hapds[2].own_addr())
2073        logger.info("BSS 1_0: " + str(bss))
2074        eht_parse_rnr(bss, rnr, hapds[0].own_addr())
2075
2076        stop_mld_devs(hapds, params['prefix'])
2077
2078def test_eht_mld_cohosted_discovery(dev, apdev, params):
2079    """EHT 2 AP MLDs discovery"""
2080    eht_mld_cohosted_discovery(dev, apdev, params)
2081
2082def test_eht_mld_cohosted_discovery_with_rnr(dev, apdev, params):
2083    """EHT 2 AP MLDs discovery (with co-location RNR)"""
2084    eht_mld_cohosted_discovery(dev, apdev, params, rnr=True)
2085
2086def test_eht_mld_cohosted_connectivity(dev, apdev, params):
2087    """EHT 2 AP MLDs with 2 MLD clients connection"""
2088    check_sae_capab(dev[0])
2089
2090    with HWSimRadio(use_mlo=True, n_channels=2) as (hapd_radio, hapd_iface), \
2091        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface), \
2092        HWSimRadio(use_mlo=True) as (wpas_radio1, wpas_iface1):
2093
2094        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
2095        wpas.interface_add(wpas_iface)
2096        check_sae_capab(wpas)
2097
2098        wpas1 = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
2099        wpas1.interface_add(wpas_iface1)
2100        check_sae_capab(wpas1)
2101
2102        hapds = get_mld_devs(hapd_iface=hapd_iface, count=2,
2103                             prefix=params['prefix'], rnr=False)
2104
2105        passphrase = "qwertyuiop-"
2106        ssid = "mld-"
2107
2108        # Connect one client to first AP MLD and verify traffic on both links
2109        wpas.set("sae_pwe", "1")
2110        wpas.connect(ssid + "0", sae_password=passphrase+"0", scan_freq="2412",
2111                     key_mgmt="SAE", ieee80211w="2")
2112
2113        eht_verify_status(wpas, hapds[0], 2412, 20, is_ht=True, mld=True,
2114                          valid_links=3, active_links=3)
2115        eht_verify_wifi_version(wpas)
2116
2117        traffic_test(wpas, hapds[0])
2118        traffic_test(wpas, hapds[1])
2119
2120        # Connect another client to second AP MLD and verify traffic on both
2121        # links
2122        wpas1.set("sae_pwe", "1")
2123        wpas1.connect(ssid + "1", sae_password=passphrase+"1", scan_freq="2437",
2124                      key_mgmt="SAE", ieee80211w="2")
2125
2126        eht_verify_status(wpas1, hapds[3], 2437, 20, is_ht=True, mld=True,
2127                          valid_links=3, active_links=3)
2128        eht_verify_wifi_version(wpas1)
2129
2130        traffic_test(wpas1, hapds[3])
2131        traffic_test(wpas1, hapds[2])
2132
2133        stop_mld_devs(hapds, params['prefix'])
2134
2135def test_eht_mlo_color_change(dev, apdev):
2136    """AP MLD and Color Change Announcement"""
2137    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
2138        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
2139
2140        ssid = "mld_ap"
2141        passphrase = 'qwertyuiop'
2142
2143        params = eht_mld_ap_wpa2_params(ssid, passphrase,
2144                                        key_mgmt="SAE", mfp="2", pwe='1')
2145        params['he_bss_color'] = '42'
2146
2147        hapd0 = eht_mld_enable_ap(hapd_iface, params)
2148
2149        params['channel'] = '6'
2150        params['he_bss_color'] = '24'
2151
2152        hapd1 = eht_mld_enable_ap(hapd_iface, params)
2153
2154        logger.info("Perform CCA on 1st link")
2155        if "OK" not in hapd0.request("COLOR_CHANGE 10"):
2156            raise Exception("COLOR_CHANGE failed")
2157
2158        time.sleep(1.5)
2159
2160        color = hapd0.get_status_field("he_bss_color")
2161        if color != "10":
2162            raise Exception("Expected current he_bss_color to be 10; was " + color)
2163
2164        logger.info("Perform CCA on 1st link again")
2165        if "OK" not in hapd0.request("COLOR_CHANGE 60"):
2166            raise Exception("COLOR_CHANGE failed")
2167        time.sleep(1.5)
2168
2169        color = hapd0.get_status_field("he_bss_color")
2170        if color != "60":
2171            raise Exception("Expected current he_bss_color to be 60; was " + color)
2172
2173        logger.info("Perform CCA on 2nd link")
2174        if "OK" not in hapd1.request("COLOR_CHANGE 25"):
2175            raise Exception("COLOR_CHANGE failed")
2176        time.sleep(1.5)
2177
2178        color = hapd1.get_status_field("he_bss_color")
2179        if color != "25":
2180            raise Exception("Expected current he_bss_color to be 25; was " + color)
2181
2182        logger.info("Perform CCA on 2nd link again")
2183        if "OK" not in hapd1.request("COLOR_CHANGE 5"):
2184            raise Exception("COLOR_CHANGE failed")
2185        time.sleep(1.5)
2186
2187        color = hapd1.get_status_field("he_bss_color")
2188        if color != "5":
2189            raise Exception("Expected current he_bss_color to be 5; was " + color)
2190
2191        hapd0.dump_monitor()
2192        hapd1.dump_monitor()
2193