1# WPA2-Personal OCV tests
2# Copyright (c) 2018, Mathy Vanhoef
3#
4# This software may be distributed under the terms of the BSD license.
5# See README for more details
6
7from remotehost import remote_compatible
8import binascii, struct
9import logging, time
10logger = logging.getLogger()
11
12import hostapd
13from wpasupplicant import WpaSupplicant
14import hwsim_utils
15from utils import *
16from test_erp import start_erp_as
17from test_ap_ft import ft_params1, ft_params2
18from test_ap_psk import parse_eapol, build_eapol, pmk_to_ptk, eapol_key_mic, recv_eapol, send_eapol, reply_eapol, build_eapol_key_3_4, aes_wrap, pad_key_data
19
20#TODO: Refuse setting up AP with OCV but without MFP support
21#TODO: Refuse to connect to AP that advertises OCV but not MFP
22
23def make_ocikde(op_class, channel, seg1_idx):
24    WLAN_EID_VENDOR_SPECIFIC = 221
25    RSN_KEY_DATA_OCI = b"\x00\x0f\xac\x0d"
26
27    data = RSN_KEY_DATA_OCI + struct.pack("<BBB", op_class, channel, seg1_idx)
28    ocikde = struct.pack("<BB", WLAN_EID_VENDOR_SPECIFIC, len(data)) + data
29
30    return ocikde
31
32def ocv_setup_ap(apdev, params):
33    ssid = "test-wpa2-ocv"
34    passphrase = "qwertyuiop"
35    params.update(hostapd.wpa2_params(ssid=ssid, passphrase=passphrase))
36    try:
37        hapd = hostapd.add_ap(apdev, params)
38    except Exception as e:
39        if "Failed to set hostapd parameter ocv" in str(e):
40            raise HwsimSkip("OCV not supported")
41        raise
42    return hapd, ssid, passphrase
43
44def build_eapol_key_1_2(kck, key_data, replay_counter=3, key_info=0x1382,
45                        extra_len=0, descr_type=2, key_len=16):
46    msg = {}
47    msg['version'] = 2
48    msg['type'] = 3
49    msg['length'] = 95 + len(key_data) + extra_len
50
51    msg['descr_type'] = descr_type
52    msg['rsn_key_info'] = key_info
53    msg['rsn_key_len'] = key_len
54    msg['rsn_replay_counter'] = struct.pack('>Q', replay_counter)
55    msg['rsn_key_nonce'] = binascii.unhexlify('0000000000000000000000000000000000000000000000000000000000000000')
56    msg['rsn_key_iv'] = binascii.unhexlify('00000000000000000000000000000000')
57    msg['rsn_key_rsc'] = binascii.unhexlify('0000000000000000')
58    msg['rsn_key_id'] = binascii.unhexlify('0000000000000000')
59    msg['rsn_key_data_len'] = len(key_data)
60    msg['rsn_key_data'] = key_data
61    eapol_key_mic(kck, msg)
62    return msg
63
64def build_eapol_key_2_2(kck, key_data, replay_counter=3, key_info=0x0302,
65                        extra_len=0, descr_type=2, key_len=16):
66    return build_eapol_key_1_2(kck, key_data, replay_counter, key_info,
67                               extra_len, descr_type, key_len)
68
69@remote_compatible
70def test_wpa2_ocv(dev, apdev):
71    """OCV on 2.4 GHz"""
72    params = {"channel": "1",
73              "ieee80211w": "2",
74              "ocv": "1"}
75    hapd, ssid, passphrase = ocv_setup_ap(apdev[0], params)
76    for ocv in range(2):
77        dev[0].connect(ssid, psk=passphrase, scan_freq="2412", ocv=str(ocv),
78                       ieee80211w="1")
79        dev[0].request("REMOVE_NETWORK all")
80        dev[0].wait_disconnected()
81
82@remote_compatible
83def test_wpa2_ocv_5ghz(dev, apdev):
84    """OCV on 5 GHz"""
85    try:
86        run_wpa2_ocv_5ghz(dev, apdev)
87    finally:
88        set_world_reg(apdev[0], apdev[1], dev[0])
89        dev[0].flush_scan_cache()
90
91def run_wpa2_ocv_5ghz(dev, apdev):
92    params = {"hw_mode": "a",
93              "channel": "40",
94              "ieee80211w": "2",
95              "country_code": "US",
96              "ocv": "1"}
97    hapd, ssid, passphrase = ocv_setup_ap(apdev[0], params)
98    for ocv in range(2):
99        dev[0].connect(ssid, psk=passphrase, scan_freq="5200", ocv=str(ocv),
100                       ieee80211w="1")
101        dev[0].wait_regdom(country_ie=True)
102        dev[0].request("REMOVE_NETWORK all")
103        dev[0].wait_disconnected()
104
105@remote_compatible
106def test_wpa2_ocv_ht20(dev, apdev):
107    """OCV with HT20 channel"""
108    params = {"channel": "6",
109              "ieee80211n": "1",
110              "ieee80211w": "1",
111              "ocv": "1"}
112    hapd, ssid, passphrase = ocv_setup_ap(apdev[0], params)
113    for ocv in range(2):
114        dev[0].connect(ssid, psk=passphrase, scan_freq="2437", ocv=str(ocv),
115                       ieee80211w="1", disable_ht="1")
116        dev[1].connect(ssid, psk=passphrase, scan_freq="2437", ocv=str(ocv),
117                       ieee80211w="1")
118        dev[0].request("REMOVE_NETWORK all")
119        dev[1].request("REMOVE_NETWORK all")
120        dev[0].wait_disconnected()
121        dev[1].wait_disconnected()
122
123@remote_compatible
124def test_wpa2_ocv_ht40(dev, apdev):
125    """OCV with HT40 channel"""
126    try:
127        run_wpa2_ocv_ht40(dev, apdev)
128    finally:
129        set_world_reg(apdev[0], apdev[1], dev[0])
130        dev[0].flush_scan_cache()
131        dev[1].flush_scan_cache()
132
133def run_wpa2_ocv_ht40(dev, apdev):
134    clear_scan_cache(apdev[0])
135    for channel, capab, freq, mode in [("6", "[HT40-]", "2437", "g"),
136                                       ("6", "[HT40+]", "2437", "g"),
137                                       ("40", "[HT40-]", "5200", "a"),
138                                       ("36", "[HT40+]", "5180", "a")]:
139        params = {"hw_mode": mode,
140                  "channel": channel,
141                  "country_code": "US",
142                  "ieee80211n": "1",
143                  "ht_capab": capab,
144                  "ieee80211w": "1",
145                  "ocv": "1"}
146        hapd, ssid, passphrase = ocv_setup_ap(apdev[0], params)
147        dev[0].flush_scan_cache()
148        dev[1].flush_scan_cache()
149        for ocv in range(2):
150            dev[0].connect(ssid, psk=passphrase, scan_freq=freq, ocv=str(ocv),
151                           ieee80211w="1", disable_ht="1")
152            dev[1].connect(ssid, psk=passphrase, scan_freq=freq, ocv=str(ocv),
153                           ieee80211w="1")
154            dev[0].wait_regdom(country_ie=True)
155            dev[0].request("REMOVE_NETWORK all")
156            dev[1].request("REMOVE_NETWORK all")
157            dev[0].wait_disconnected()
158            dev[1].wait_disconnected()
159        hapd.disable()
160
161@remote_compatible
162def test_wpa2_ocv_vht40(dev, apdev):
163    """OCV with VHT40 channel"""
164    try:
165        run_wpa2_ocv_vht40(dev, apdev)
166    finally:
167        set_world_reg(apdev[0], apdev[1], dev[0])
168        dev[0].flush_scan_cache()
169        dev[1].flush_scan_cache()
170        dev[2].flush_scan_cache()
171
172def run_wpa2_ocv_vht40(dev, apdev):
173    clear_scan_cache(apdev[0])
174    for channel, capab, freq in [("40", "[HT40-]", "5200"),
175                                 ("36", "[HT40+]", "5180")]:
176        params = {"hw_mode": "a",
177                  "channel": channel,
178                  "country_code": "US",
179                  "ht_capab": capab,
180                  "ieee80211n": "1",
181                  "ieee80211ac": "1",
182                  "vht_oper_chwidth": "0",
183                  "vht_oper_centr_freq_seg0_idx": "38",
184                  "ieee80211w": "1",
185                  "ocv": "1"}
186        hapd, ssid, passphrase = ocv_setup_ap(apdev[0], params)
187        dev[0].flush_scan_cache()
188        dev[1].flush_scan_cache()
189        dev[2].flush_scan_cache()
190        for ocv in range(2):
191            dev[0].connect(ssid, psk=passphrase, scan_freq=freq, ocv=str(ocv),
192                           ieee80211w="1", disable_ht="1")
193            dev[1].connect(ssid, psk=passphrase, scan_freq=freq, ocv=str(ocv),
194                           ieee80211w="1", disable_vht="1")
195            dev[2].connect(ssid, psk=passphrase, scan_freq=freq, ocv=str(ocv),
196                           ieee80211w="1")
197            dev[0].wait_regdom(country_ie=True)
198            dev[0].request("REMOVE_NETWORK all")
199            dev[1].request("REMOVE_NETWORK all")
200            dev[2].request("REMOVE_NETWORK all")
201            dev[0].wait_disconnected()
202            dev[1].wait_disconnected()
203            dev[2].wait_disconnected()
204        hapd.disable()
205
206@remote_compatible
207def test_wpa2_ocv_vht80(dev, apdev):
208    """OCV with VHT80 channel"""
209    try:
210        run_wpa2_ocv_vht80(dev, apdev)
211    finally:
212        set_world_reg(apdev[0], apdev[1], dev[0])
213        dev[0].flush_scan_cache()
214        dev[1].flush_scan_cache()
215        dev[2].flush_scan_cache()
216
217def run_wpa2_ocv_vht80(dev, apdev):
218    clear_scan_cache(apdev[0])
219    for channel, capab, freq in [("40", "[HT40-]", "5200"),
220                                 ("36", "[HT40+]", "5180")]:
221        params = {"hw_mode": "a",
222                  "channel": channel,
223                  "country_code": "US",
224                  "ht_capab": capab,
225                  "ieee80211n": "1",
226                  "ieee80211ac": "1",
227                  "vht_oper_chwidth": "1",
228                  "vht_oper_centr_freq_seg0_idx": "42",
229                  "ieee80211w": "1",
230                  "ocv": "1"}
231        hapd, ssid, passphrase = ocv_setup_ap(apdev[0], params)
232        for ocv in range(2):
233            dev[0].connect(ssid, psk=passphrase, scan_freq=freq, ocv=str(ocv),
234                           ieee80211w="1", disable_ht="1")
235            dev[1].connect(ssid, psk=passphrase, scan_freq=freq, ocv=str(ocv),
236                           ieee80211w="1", disable_vht="1")
237            dev[2].connect(ssid, psk=passphrase, scan_freq=freq, ocv=str(ocv),
238                           ieee80211w="1")
239            dev[0].wait_regdom(country_ie=True)
240            dev[0].request("REMOVE_NETWORK all")
241            dev[1].request("REMOVE_NETWORK all")
242            dev[2].request("REMOVE_NETWORK all")
243            dev[0].wait_disconnected()
244            dev[1].wait_disconnected()
245            dev[2].wait_disconnected()
246        hapd.disable()
247
248@remote_compatible
249def test_wpa2_ocv_vht160(dev, apdev):
250    """OCV with VHT160 channel"""
251    try:
252        run_wpa2_ocv_vht160(dev, apdev)
253    finally:
254        set_world_reg(apdev[0], apdev[1], dev[0])
255        dev[0].flush_scan_cache()
256        dev[1].flush_scan_cache()
257        dev[2].flush_scan_cache()
258
259def run_wpa2_ocv_vht160(dev, apdev):
260    for channel, capab, freq in [("100", "[HT40+]", "5500"),
261                                 ("104", "[HT40-]", "5520")]:
262        params = {"hw_mode": "a",
263                  "channel": channel,
264                  "country_code": "ZA",
265                  "ht_capab": capab,
266                  "vht_capab": "[VHT160]",
267                  "ieee80211n": "1",
268                  "ieee80211ac": "1",
269                  "vht_oper_chwidth": "2",
270                  "vht_oper_centr_freq_seg0_idx": "114",
271                  "ieee80211w": "1",
272                  "ocv": "1"}
273        hapd, ssid, passphrase = ocv_setup_ap(apdev[0], params)
274        for ocv in range(2):
275            dev[0].connect(ssid, psk=passphrase, scan_freq=freq, ocv=str(ocv),
276                           ieee80211w="1", disable_ht="1")
277            dev[1].connect(ssid, psk=passphrase, scan_freq=freq, ocv=str(ocv),
278                           ieee80211w="1", disable_vht="1")
279            dev[2].connect(ssid, psk=passphrase, scan_freq=freq, ocv=str(ocv),
280                           ieee80211w="1")
281            dev[0].wait_regdom(country_ie=True)
282            dev[0].request("REMOVE_NETWORK all")
283            dev[1].request("REMOVE_NETWORK all")
284            dev[2].request("REMOVE_NETWORK all")
285            dev[0].wait_disconnected()
286            dev[1].wait_disconnected()
287            dev[2].wait_disconnected()
288        hapd.disable()
289
290@remote_compatible
291def test_wpa2_ocv_vht80plus80(dev, apdev):
292    """OCV with VHT80+80 channel"""
293    try:
294        run_wpa2_ocv_vht80plus80(dev, apdev)
295    finally:
296        set_world_reg(apdev[0], apdev[1], dev[0])
297        dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=0.5)
298        dev[0].flush_scan_cache()
299        dev[1].flush_scan_cache()
300        dev[2].flush_scan_cache()
301
302def run_wpa2_ocv_vht80plus80(dev, apdev):
303    for channel, capab, freq in [("36", "[HT40+]", "5180"),
304                                 ("40", "[HT40-]", "5200")]:
305        params = {"hw_mode": "a",
306                  "channel": channel,
307                  "country_code": "US",
308                  "ht_capab": capab,
309                  "vht_capab": "[VHT160-80PLUS80]",
310                  "ieee80211n": "1",
311                  "ieee80211ac": "1",
312                  "vht_oper_chwidth": "3",
313                  "vht_oper_centr_freq_seg0_idx": "42",
314                  "vht_oper_centr_freq_seg1_idx": "155",
315                  "ieee80211w": "1",
316                  "ieee80211d": "1",
317                  "ieee80211h": "1",
318                  "ocv": "1"}
319        hapd, ssid, passphrase = ocv_setup_ap(apdev[0], params)
320        for ocv in range(2):
321            dev[0].connect(ssid, psk=passphrase, scan_freq=freq, ocv=str(ocv),
322                           ieee80211w="1", disable_ht="1")
323            dev[1].connect(ssid, psk=passphrase, scan_freq=freq, ocv=str(ocv),
324                           ieee80211w="1", disable_vht="1")
325            dev[2].connect(ssid, psk=passphrase, scan_freq=freq, ocv=str(ocv),
326                           ieee80211w="1")
327            dev[0].wait_regdom(country_ie=True)
328            dev[0].request("REMOVE_NETWORK all")
329            dev[1].request("REMOVE_NETWORK all")
330            dev[2].request("REMOVE_NETWORK all")
331            dev[0].wait_disconnected()
332            dev[1].wait_disconnected()
333            dev[2].wait_disconnected()
334        for i in range(3):
335            dev[i].connect(ssid, psk=passphrase, scan_freq=freq, ocv=str(ocv),
336                           ieee80211w="1")
337            if i == 0:
338                dev[i].wait_regdom(country_ie=True)
339        hapd.disable()
340        for i in range(3):
341            dev[i].request("DISCONNECT")
342        for i in range(3):
343            dev[i].disconnect_and_stop_scan()
344
345class APConnection:
346    def init_params(self):
347        # Static parameters
348        self.ssid = "test-wpa2-ocv"
349        self.passphrase = "qwertyuiop"
350        self.psk = "c2c6c255af836bed1b3f2f1ded98e052f5ad618bb554e2836757b55854a0eab7"
351
352        # Dynamic parameters
353        self.hapd = None
354        self.addr = None
355        self.rsne = None
356        self.kck = None
357        self.kek = None
358        self.msg = None
359        self.bssid = None
360        self.anonce = None
361        self.snonce = None
362        self.dev = None
363
364    def __init__(self, apdev, dev, params):
365        self.init_params()
366
367        # By default, OCV is enabled for both the client and AP. The following
368        # parameters can be used to disable OCV for the client or AP.
369        ap_ocv = params.pop("ap_ocv", "1")
370        sta_ocv = params.pop("sta_ocv", "1")
371
372        freq = params.pop("freq")
373        params.update(hostapd.wpa2_params(ssid=self.ssid,
374                                          passphrase=self.passphrase))
375        params["wpa_pairwise_update_count"] = "10"
376        params["ocv"] = ap_ocv
377        try:
378            self.hapd = hostapd.add_ap(apdev, params)
379        except Exception as e:
380            if "Failed to set hostapd parameter ocv" in str(e):
381                raise HwsimSkip("OCV not supported")
382            raise
383        self.hapd.request("SET ext_eapol_frame_io 1")
384        self.dev = dev
385        dev.request("SET ext_eapol_frame_io 1")
386
387        self.bssid = apdev['bssid']
388        pmk = binascii.unhexlify("c2c6c255af836bed1b3f2f1ded98e052f5ad618bb554e2836757b55854a0eab7")
389
390        if sta_ocv != "0":
391            self.rsne = binascii.unhexlify("301a0100000fac040100000fac040100000fac028c400000000fac06")
392        else:
393            self.rsne = binascii.unhexlify("301a0100000fac040100000fac040100000fac028c000000000fac06")
394        self.snonce = binascii.unhexlify('1111111111111111111111111111111111111111111111111111111111111111')
395
396        dev.connect(self.ssid, raw_psk=self.psk, scan_freq=freq, ocv=sta_ocv,
397                    ieee80211w="1", wait_connect=False)
398        if "country_code" in params:
399            dev.wait_regdom(country_ie=True)
400        self.addr = dev.p2p_interface_addr()
401
402        # Wait for EAPOL-Key msg 1/4 from hostapd to determine when associated
403        self.msg = recv_eapol(self.hapd)
404        self.anonce = self.msg['rsn_key_nonce']
405        (ptk, self.kck, self.kek) = pmk_to_ptk(pmk, self.addr, self.bssid,
406                                               self.snonce, self.anonce)
407
408    # hapd, addr, rsne, kck, msg, anonce, snonce
409    def test_bad_oci(self, logmsg, op_class, channel, seg1_idx):
410        logger.debug("Bad OCI element: " + logmsg)
411        if op_class is None:
412            ocikde = b''
413        else:
414            ocikde = make_ocikde(op_class, channel, seg1_idx)
415
416        reply_eapol("2/4", self.hapd, self.addr, self.msg, 0x010a, self.snonce,
417                    self.rsne + ocikde, self.kck)
418        self.msg = recv_eapol(self.hapd)
419        if self.anonce != self.msg['rsn_key_nonce'] or self.msg["rsn_key_info"] != 138:
420            self.dev.request("DISCONNECT")
421            raise Exception("Didn't receive retransmitted 1/4")
422
423    def confirm_valid_oci(self, op_class, channel, seg1_idx):
424        logger.debug("Valid OCI element to complete handshake")
425        ocikde = make_ocikde(op_class, channel, seg1_idx)
426
427        reply_eapol("2/4", self.hapd, self.addr, self.msg, 0x010a, self.snonce,
428                    self.rsne + ocikde, self.kck)
429        self.msg = recv_eapol(self.hapd)
430        if self.anonce != self.msg['rsn_key_nonce'] or self.msg["rsn_key_info"] != 5066:
431            raise Exception("Didn't receive 3/4 in response to valid 2/4")
432
433        reply_eapol("4/4", self.hapd, self.addr, self.msg, 0x030a, None, None,
434                    self.kck)
435        self.hapd.wait_sta(timeout=15)
436
437@remote_compatible
438def test_wpa2_ocv_ap_mismatch(dev, apdev):
439    """OCV AP mismatch"""
440    params = {"channel": "1",
441              "ieee80211w": "1",
442              "freq": "2412"}
443    conn = APConnection(apdev[0], dev[0], params)
444    conn.test_bad_oci("element missing", None, 0, 0)
445    conn.test_bad_oci("wrong channel number", 81, 6, 0)
446    conn.test_bad_oci("invalid channel number", 81, 0, 0)
447    conn.test_bad_oci("wrong operating class", 80, 0, 0)
448    conn.test_bad_oci("invalid operating class", 0, 0, 0)
449    conn.confirm_valid_oci(81, 1, 0)
450
451@remote_compatible
452def test_wpa2_ocv_ap_ht_mismatch(dev, apdev):
453    """OCV AP mismatch (HT)"""
454    clear_scan_cache(apdev[0])
455    params = {"channel": "6",
456              "ht_capab": "[HT40-]",
457              "ieee80211w": "1",
458              "freq": "2437"}
459    conn = APConnection(apdev[0], dev[0], params)
460    conn.test_bad_oci("wrong primary channel", 84, 5, 0)
461    conn.test_bad_oci("lower bandwidth than negotiated", 81, 6, 0)
462    conn.test_bad_oci("bad upper/lower channel", 83, 6, 0)
463    conn.confirm_valid_oci(84, 6, 0)
464
465@remote_compatible
466def test_wpa2_ocv_ap_vht80_mismatch(dev, apdev):
467    """OCV AP mismatch (VHT80)"""
468    try:
469        run_wpa2_ocv_ap_vht80_mismatch(dev, apdev)
470    finally:
471        set_world_reg(apdev[0], apdev[1], dev[0])
472        dev[0].flush_scan_cache()
473
474def run_wpa2_ocv_ap_vht80_mismatch(dev, apdev):
475    params = {"hw_mode": "a",
476              "channel": "36",
477              "country_code": "US",
478              "ht_capab": "[HT40+]",
479              "ieee80211w": "1",
480              "ieee80211n": "1",
481              "ieee80211ac": "1",
482              "vht_oper_chwidth": "1",
483              "freq": "5180",
484              "vht_oper_centr_freq_seg0_idx": "42"}
485    conn = APConnection(apdev[0], dev[0], params)
486    conn.test_bad_oci("wrong primary channel", 128, 38, 0)
487    conn.test_bad_oci("wrong primary channel", 128, 32, 0)
488    conn.test_bad_oci("smaller bandwidth than negotiated", 116, 36, 0)
489    conn.test_bad_oci("smaller bandwidth than negotiated", 115, 36, 0)
490    conn.confirm_valid_oci(128, 36, 0)
491
492    dev[0].dump_monitor()
493    dev[0].request("DISCONNECT")
494    dev[0].wait_disconnected()
495
496@remote_compatible
497def test_wpa2_ocv_ap_vht160_mismatch(dev, apdev):
498    """OCV AP mismatch (VHT160)"""
499    try:
500        run_wpa2_ocv_ap_vht160_mismatch(dev, apdev)
501    finally:
502        set_world_reg(apdev[0], apdev[1], dev[0])
503        dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=0.5)
504        dev[0].flush_scan_cache()
505
506def run_wpa2_ocv_ap_vht160_mismatch(dev, apdev):
507    params = {"hw_mode": "a",
508              "channel": "100",
509              "country_code": "ZA",
510              "ht_capab": "[HT40+]",
511              "ieee80211w": "1",
512              "ieee80211n": "1",
513              "ieee80211ac": "1",
514              "vht_oper_chwidth": "2",
515              "freq": "5500",
516              "vht_oper_centr_freq_seg0_idx": "114",
517              "ieee80211d": "1",
518              "ieee80211h": "1"}
519    conn = APConnection(apdev[0], dev[0], params)
520    conn.test_bad_oci("wrong primary channel", 129, 36, 0)
521    conn.test_bad_oci("wrong primary channel", 129, 114, 0)
522    conn.test_bad_oci("smaller bandwidth (20 Mhz) than negotiated", 121, 100, 0)
523    conn.test_bad_oci("smaller bandwidth (40 Mhz) than negotiated", 122, 100, 0)
524    conn.test_bad_oci("smaller bandwidth (80 Mhz) than negotiated", 128, 100, 0)
525    conn.confirm_valid_oci(129, 100, 0)
526
527    dev[0].dump_monitor()
528    if conn.hapd:
529        conn.hapd.request("DISABLE")
530    dev[0].disconnect_and_stop_scan()
531
532@remote_compatible
533def test_wpa2_ocv_ap_vht80plus80_mismatch(dev, apdev):
534    """OCV AP mismatch (VHT80+80)"""
535    try:
536        run_wpa2_ocv_ap_vht80plus80_mismatch(dev, apdev)
537    finally:
538        set_world_reg(apdev[0], apdev[1], dev[0])
539        dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=0.5)
540        dev[0].flush_scan_cache()
541
542def run_wpa2_ocv_ap_vht80plus80_mismatch(dev, apdev):
543    params = {"hw_mode": "a",
544              "channel": "36",
545              "country_code": "US",
546              "ht_capab": "[HT40+]",
547              "ieee80211w": "1",
548              "ieee80211n": "1",
549              "ieee80211ac": "1",
550              "vht_oper_chwidth": "3",
551              "freq": "5180",
552              "vht_oper_centr_freq_seg0_idx": "42",
553              "ieee80211d": "1",
554              "vht_oper_centr_freq_seg1_idx": "155",
555              "ieee80211h": "1"}
556    conn = APConnection(apdev[0], dev[0], params)
557    conn.test_bad_oci("using 80 MHz operating class", 128, 36, 155)
558    conn.test_bad_oci("wrong frequency segment 1", 130, 36, 138)
559    conn.confirm_valid_oci(130, 36, 155)
560
561    dev[0].dump_monitor()
562    if conn.hapd:
563        conn.hapd.request("DISABLE")
564    dev[0].disconnect_and_stop_scan()
565
566@remote_compatible
567def test_wpa2_ocv_ap_unexpected1(dev, apdev):
568    """OCV and unexpected OCI KDE from station"""
569    params = {"channel": "1",
570              "ieee80211w": "1",
571              "ap_ocv": "0",
572              "sta_ocv": "1",
573              "freq": "2412"}
574    conn = APConnection(apdev[0], dev[0], params)
575    logger.debug("Client will send OCI KDE even if it was not negotiated")
576    conn.confirm_valid_oci(81, 1, 0)
577
578@remote_compatible
579def test_wpa2_ocv_ap_unexpected2(dev, apdev):
580    """OCV and unexpected OCI KDE from station"""
581    params = {"channel": "1",
582              "ieee80211w": "1",
583              "ap_ocv": "1",
584              "sta_ocv": "0",
585              "freq": "2412"}
586    conn = APConnection(apdev[0], dev[0], params)
587    logger.debug("Client will send OCI KDE even if it was not negotiated")
588    conn.confirm_valid_oci(81, 1, 0)
589
590@remote_compatible
591def test_wpa2_ocv_ap_retransmit_msg3(dev, apdev):
592    """Verify that manually retransmitted msg 3/4 contain a correct OCI"""
593    bssid = apdev[0]['bssid']
594    ssid = "test-wpa2-ocv"
595    passphrase = "qwertyuiop"
596    psk = "c2c6c255af836bed1b3f2f1ded98e052f5ad618bb554e2836757b55854a0eab7"
597    params = hostapd.wpa2_params(ssid=ssid)
598    params["wpa_psk"] = psk
599    params["ieee80211w"] = "1"
600    params["ocv"] = "1"
601    params['wpa_disable_eapol_key_retries'] = "1"
602    try:
603        hapd = hostapd.add_ap(apdev[0], params)
604    except Exception as e:
605        if "Failed to set hostapd parameter ocv" in str(e):
606            raise HwsimSkip("OCV not supported")
607        raise
608    hapd.request("SET ext_eapol_frame_io 1")
609    dev[0].request("SET ext_eapol_frame_io 1")
610    dev[0].connect(ssid, psk=passphrase, scan_freq="2412", wait_connect=False,
611                   ocv="1", ieee80211w="1")
612    addr = dev[0].own_addr()
613
614    # EAPOL-Key msg 1/4
615    ev = hapd.wait_event(["EAPOL-TX"], timeout=15)
616    if ev is None:
617        raise Exception("Timeout on EAPOL-TX from hostapd")
618    res = dev[0].request("EAPOL_RX " + bssid + " " + ev.split(' ')[2])
619    if "OK" not in res:
620        raise Exception("EAPOL_RX to wpa_supplicant failed")
621
622    # EAPOL-Key msg 2/4
623    ev = dev[0].wait_event(["EAPOL-TX"], timeout=15)
624    if ev is None:
625        raise Exception("Timeout on EAPOL-TX from wpa_supplicant")
626    res = hapd.request("EAPOL_RX " + addr + " " + ev.split(' ')[2])
627    if "OK" not in res:
628        raise Exception("EAPOL_RX to hostapd failed")
629
630    # EAPOL-Key msg 3/4
631    ev = hapd.wait_event(["EAPOL-TX"], timeout=15)
632    if ev is None:
633        raise Exception("Timeout on EAPOL-TX from hostapd")
634    logger.info("Drop the first EAPOL-Key msg 3/4")
635
636    # Use normal EAPOL TX/RX to handle retries.
637    hapd.request("SET ext_eapol_frame_io 0")
638    dev[0].request("SET ext_eapol_frame_io 0")
639
640    # Manually retransmit EAPOL-Key msg 3/4
641    if "OK" not in hapd.request("RESEND_M3 " + addr):
642        raise Exception("RESEND_M3 failed")
643
644    dev[0].wait_connected()
645    hwsim_utils.test_connectivity(dev[0], hapd)
646
647def test_wpa2_ocv_ap_group_hs(dev, apdev):
648    """OCV group handshake (AP)"""
649    params = {"channel": "1",
650              "ieee80211w": "1",
651              "freq": "2412",
652              "wpa_strict_rekey": "1"}
653    conn = APConnection(apdev[0], dev[0], params)
654    conn.confirm_valid_oci(81, 1, 0)
655
656    conn.hapd.request("SET ext_eapol_frame_io 0")
657    dev[1].connect(conn.ssid, psk=conn.passphrase, scan_freq="2412", ocv="1",
658                   ieee80211w="1")
659    conn.hapd.wait_sta()
660    conn.hapd.request("SET ext_eapol_frame_io 1")
661
662    # Trigger a group key handshake
663    dev[1].request("DISCONNECT")
664    dev[0].dump_monitor()
665
666    # Wait for EAPOL-Key msg 1/2
667    conn.msg = recv_eapol(conn.hapd)
668    if conn.msg["rsn_key_info"] != 4994:
669        raise Exception("Didn't receive 1/2 of group key handshake")
670
671    # Send a EAPOL-Key msg 2/2 with a bad OCI
672    logger.info("Bad OCI element")
673    ocikde = make_ocikde(1, 1, 1)
674    msg = build_eapol_key_2_2(conn.kck, ocikde, replay_counter=3)
675    conn.hapd.dump_monitor()
676    send_eapol(conn.hapd, conn.addr, build_eapol(msg))
677
678    # Wait for retransmitted EAPOL-Key msg 1/2
679    conn.msg = recv_eapol(conn.hapd)
680    if conn.msg["rsn_key_info"] != 4994:
681        raise Exception("Didn't receive 1/2 of group key handshake")
682
683    # Send a EAPOL-Key msg 2/2 with a good OCI
684    logger.info("Good OCI element")
685    ocikde = make_ocikde(81, 1, 0)
686    msg = build_eapol_key_2_2(conn.kck, ocikde, replay_counter=4)
687    conn.hapd.dump_monitor()
688    send_eapol(conn.hapd, conn.addr, build_eapol(msg))
689
690    # Verify that group key handshake has completed
691    ev = conn.hapd.wait_event(["EAPOL-TX"], timeout=1)
692    if ev is not None:
693        eapol = binascii.unhexlify(ev.split(' ')[2])
694        msg = parse_eapol(eapol)
695        if msg["rsn_key_info"] == 4994:
696            raise Exception("AP didn't accept 2/2 of group key handshake")
697
698class STAConnection:
699    def init_params(self):
700        # Static parameters
701        self.ssid = "test-wpa2-ocv"
702        self.passphrase = "qwertyuiop"
703        self.psk = "c2c6c255af836bed1b3f2f1ded98e052f5ad618bb554e2836757b55854a0eab7"
704
705        # Dynamic parameters
706        self.hapd = None
707        self.dev = None
708        self.addr = None
709        self.rsne = None
710        self.kck = None
711        self.kek = None
712        self.msg = None
713        self.bssid = None
714        self.anonce = None
715        self.snonce = None
716        self.gtkie = None
717        self.counter = None
718
719    def __init__(self, apdev, dev, params, sta_params=None):
720        self.init_params()
721        self.dev = dev
722        self.bssid = apdev['bssid']
723
724        freq = params.pop("freq")
725        if sta_params is None:
726            sta_params = dict()
727        if "ocv" not in sta_params:
728            sta_params["ocv"] = "1"
729        if "ieee80211w" not in sta_params:
730            sta_params["ieee80211w"] = "1"
731
732        params.update(hostapd.wpa2_params(ssid=self.ssid,
733                                          passphrase=self.passphrase))
734        params['wpa_pairwise_update_count'] = "10"
735
736        try:
737            self.hapd = hostapd.add_ap(apdev, params)
738        except Exception as e:
739            if "Failed to set hostapd parameter ocv" in str(e):
740                raise HwsimSkip("OCV not supported")
741            raise
742        self.hapd.request("SET ext_eapol_frame_io 1")
743        self.dev.request("SET ext_eapol_frame_io 1")
744        pmk = binascii.unhexlify("c2c6c255af836bed1b3f2f1ded98e052f5ad618bb554e2836757b55854a0eab7")
745
746        self.gtkie = binascii.unhexlify("dd16000fac010100dc11188831bf4aa4a8678d2b41498618")
747        if sta_params["ocv"] != "0":
748            self.rsne = binascii.unhexlify("30140100000fac040100000fac040100000fac028c40")
749        else:
750            self.rsne = binascii.unhexlify("30140100000fac040100000fac040100000fac028c00")
751
752        self.dev.connect(self.ssid, raw_psk=self.psk, scan_freq=freq,
753                         wait_connect=False, **sta_params)
754        if "country_code" in params:
755            self.dev.wait_regdom(country_ie=True)
756        self.addr = dev.p2p_interface_addr()
757
758        # Forward msg 1/4 from AP to STA
759        self.msg = recv_eapol(self.hapd)
760        self.anonce = self.msg['rsn_key_nonce']
761        send_eapol(self.dev, self.bssid, build_eapol(self.msg))
762
763        # Capture msg 2/4 from the STA so we can derive the session keys
764        self.msg = recv_eapol(dev)
765        self.snonce = self.msg['rsn_key_nonce']
766        (ptk, self.kck, self.kek) = pmk_to_ptk(pmk, self.addr, self.bssid,
767                                               self.snonce, self.anonce)
768
769        self.counter = struct.unpack('>Q',
770                                     self.msg['rsn_replay_counter'])[0] + 1
771
772    def test_bad_oci(self, logmsg, op_class, channel, seg1_idx, errmsg):
773        logger.info("Bad OCI element: " + logmsg)
774        if op_class is None:
775            ocikde = b''
776        else:
777            ocikde = make_ocikde(op_class, channel, seg1_idx)
778
779        plain = self.rsne + self.gtkie + ocikde
780        wrapped = aes_wrap(self.kek, pad_key_data(plain))
781        msg = build_eapol_key_3_4(self.anonce, self.kck, wrapped,
782                                  replay_counter=self.counter)
783
784        self.dev.dump_monitor()
785        send_eapol(self.dev, self.bssid, build_eapol(msg))
786        self.counter += 1
787
788        ev = self.dev.wait_event([errmsg], timeout=5)
789        if ev is None:
790            raise Exception("Bad OCI not reported")
791
792    def confirm_valid_oci(self, op_class, channel, seg1_idx):
793        logger.debug("Valid OCI element to complete handshake")
794        ocikde = make_ocikde(op_class, channel, seg1_idx)
795
796        plain = self.rsne + self.gtkie + ocikde
797        wrapped = aes_wrap(self.kek, pad_key_data(plain))
798        msg = build_eapol_key_3_4(self.anonce, self.kck, wrapped,
799                                  replay_counter=self.counter)
800
801        self.dev.dump_monitor()
802        send_eapol(self.dev, self.bssid, build_eapol(msg))
803        self.counter += 1
804
805        self.dev.wait_connected(timeout=1)
806
807@remote_compatible
808def test_wpa2_ocv_mismatch_client(dev, apdev):
809    """OCV client mismatch"""
810    params = {"channel": "1",
811              "ieee80211w": "1",
812              "ocv": "1",
813              "freq": "2412"}
814    conn = STAConnection(apdev[0], dev[0], params)
815    conn.test_bad_oci("element missing", None, 0, 0,
816                      "did not receive mandatory OCI")
817    conn.test_bad_oci("wrong channel number", 81, 6, 0,
818                      "primary channel mismatch")
819    conn.test_bad_oci("invalid channel number", 81, 0, 0,
820                      "unable to interpret received OCI")
821    conn.test_bad_oci("wrong operating class", 80, 0, 0,
822                      "unable to interpret received OCI")
823    conn.test_bad_oci("invalid operating class", 0, 0, 0,
824                      "unable to interpret received OCI")
825    conn.confirm_valid_oci(81, 1, 0)
826
827@remote_compatible
828def test_wpa2_ocv_vht160_mismatch_client(dev, apdev):
829    """OCV client mismatch (VHT160)"""
830    try:
831        run_wpa2_ocv_vht160_mismatch_client(dev, apdev)
832    finally:
833        set_world_reg(apdev[0], apdev[1], dev[0])
834        dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=0.5)
835        dev[0].flush_scan_cache()
836
837def run_wpa2_ocv_vht160_mismatch_client(dev, apdev):
838    params = {"hw_mode": "a",
839              "channel": "100",
840              "country_code": "ZA",
841              "ht_capab": "[HT40+]",
842              "ieee80211w": "1",
843              "ieee80211n": "1",
844              "ieee80211ac": "1",
845              "vht_oper_chwidth": "2",
846              "ocv": "1",
847              "vht_oper_centr_freq_seg0_idx": "114",
848              "freq": "5500",
849              "ieee80211d": "1",
850              "ieee80211h": "1"}
851    sta_params = {"disable_vht": "1"}
852    conn = STAConnection(apdev[0], dev[0], params, sta_params)
853    conn.test_bad_oci("smaller bandwidth (20 Mhz) than negotiated",
854                      121, 100, 0, "channel bandwidth mismatch")
855    conn.test_bad_oci("wrong frequency, bandwith, and secondary channel",
856                      123, 104, 0, "primary channel mismatch")
857    conn.test_bad_oci("wrong upper/lower behaviour",
858                      129, 104, 0, "primary channel mismatch")
859    conn.confirm_valid_oci(122, 100, 0)
860
861    dev[0].dump_monitor()
862    if conn.hapd:
863        conn.hapd.request("DISABLE")
864    dev[0].disconnect_and_stop_scan()
865
866def test_wpa2_ocv_sta_group_hs(dev, apdev):
867    """OCV group handshake (STA)"""
868    params = {"channel": "1",
869              "ieee80211w": "1",
870              "ocv": "1",
871              "freq": "2412",
872              "wpa_strict_rekey": "1"}
873    conn = STAConnection(apdev[0], dev[0], params.copy())
874    conn.confirm_valid_oci(81, 1, 0)
875
876    # Send a EAPOL-Key msg 1/2 with a bad OCI
877    logger.info("Bad OCI element")
878    plain = conn.gtkie + make_ocikde(1, 1, 1)
879    wrapped = aes_wrap(conn.kek, pad_key_data(plain))
880    msg = build_eapol_key_1_2(conn.kck, wrapped, replay_counter=3)
881    send_eapol(dev[0], conn.bssid, build_eapol(msg))
882
883    # We shouldn't get a EAPOL-Key message back
884    ev = dev[0].wait_event(["EAPOL-TX"], timeout=1)
885    if ev is not None:
886        raise Exception("Received response to invalid EAPOL-Key 1/2")
887
888    # Reset AP to try with valid OCI
889    conn.hapd.disable()
890    conn = STAConnection(apdev[0], dev[0], params.copy())
891    conn.confirm_valid_oci(81, 1, 0)
892
893    # Send a EAPOL-Key msg 1/2 with a good OCI
894    logger.info("Good OCI element")
895    plain = conn.gtkie + make_ocikde(81, 1, 0)
896    wrapped = aes_wrap(conn.kek, pad_key_data(plain))
897    msg = build_eapol_key_1_2(conn.kck, wrapped, replay_counter=4)
898    send_eapol(dev[0], conn.bssid, build_eapol(msg))
899
900    # Wait for EAPOL-Key msg 2/2
901    conn.msg = recv_eapol(dev[0])
902    if conn.msg["rsn_key_info"] != 0x0302:
903        raise Exception("Didn't receive 2/2 of group key handshake")
904
905def test_wpa2_ocv_auto_enable_pmf(dev, apdev):
906    """OCV on 2.4 GHz with PMF getting enabled automatically"""
907    params = {"channel": "1",
908              "ocv": "1"}
909    hapd, ssid, passphrase = ocv_setup_ap(apdev[0], params)
910    for ocv in range(2):
911        dev[0].connect(ssid, psk=passphrase, scan_freq="2412", ocv=str(ocv),
912                       ieee80211w="2")
913        dev[0].request("REMOVE_NETWORK all")
914        dev[0].wait_disconnected()
915
916def test_wpa2_ocv_auto_enable_pmf_on_sta(dev, apdev):
917    """OCV on 2.4 GHz with PMF getting enabled automatically on STA"""
918    params = {"channel": "1",
919              "ieee80211w": "1",
920              "ocv": "1"}
921    hapd, ssid, passphrase = ocv_setup_ap(apdev[0], params)
922    dev[0].connect(ssid, psk=passphrase, scan_freq="2412", ocv="1")
923
924def test_wpa2_ocv_sta_override_eapol(dev, apdev):
925    """OCV on 2.4 GHz and STA override EAPOL-Key msg 2/4"""
926    params = {"channel": "1",
927              "ieee80211w": "2",
928              "ocv": "1"}
929    hapd, ssid, passphrase = ocv_setup_ap(apdev[0], params)
930    dev[0].set("oci_freq_override_eapol", "2462")
931    dev[0].connect(ssid, psk=passphrase, scan_freq="2412", ocv="1",
932                   ieee80211w="2", wait_connect=False)
933    ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED",
934                            "CTRL-EVENT-DISCONNECTED"], timeout=15)
935    dev[0].request("DISCONNECT")
936    if ev is None:
937        raise Exception("No connection result reported")
938    if "CTRL-EVENT-CONNECTED" in ev:
939        raise Exception("Unexpected connection")
940    if "reason=15" not in ev:
941        raise Exception("Unexpected disconnection reason: " + ev)
942
943    check_ocv_failure(hapd, "EAPOL-Key msg 2/4", "eapol-key-m2",
944                      dev[0].own_addr())
945
946def test_wpa2_ocv_sta_override_sa_query_req(dev, apdev):
947    """OCV on 2.4 GHz and STA override SA Query Request"""
948    params = {"channel": "1",
949              "ieee80211w": "2",
950              "ocv": "1"}
951    hapd, ssid, passphrase = ocv_setup_ap(apdev[0], params)
952    dev[0].connect(ssid, psk=passphrase, scan_freq="2412", ocv="1",
953                   ieee80211w="2")
954    hapd.wait_sta()
955    dev[0].set("oci_freq_override_saquery_req", "2462")
956    if "OK" not in dev[0].request("UNPROT_DEAUTH"):
957        raise Exception("Triggering SA Query from the STA failed")
958    ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=3)
959    if ev is None:
960        raise Exception("Disconnection after failed SA Query not reported")
961    dev[0].set("oci_freq_override_saquery_req", "0")
962    dev[0].wait_connected()
963    if "OK" not in dev[0].request("UNPROT_DEAUTH"):
964        raise Exception("Triggering SA Query from the STA failed")
965    check_ocv_failure(hapd, "SA Query Request", "saqueryreq",
966                      dev[0].own_addr())
967    ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=3)
968    if ev is not None:
969        raise Exception("SA Query from the STA failed")
970
971def test_wpa2_ocv_sta_override_sa_query_resp(dev, apdev):
972    """OCV on 2.4 GHz and STA override SA Query Response"""
973    params = {"channel": "1",
974              "ieee80211w": "2",
975              "ocv": "1"}
976    hapd, ssid, passphrase = ocv_setup_ap(apdev[0], params)
977    dev[0].connect(ssid, psk=passphrase, scan_freq="2412", ocv="1",
978                   ieee80211w="2")
979    dev[0].set("oci_freq_override_saquery_resp", "2462")
980    hapd.wait_sta(wait_4way_hs=True)
981    if "OK" not in hapd.request("SA_QUERY " + dev[0].own_addr()):
982        raise Exception("SA_QUERY failed")
983    check_ocv_failure(hapd, "SA Query Response", "saqueryresp",
984                      dev[0].own_addr())
985
986def check_ocv_failure(dev, frame_txt, frame, addr):
987    ev = dev.wait_event(["OCV-FAILURE"], timeout=3)
988    if ev is None:
989        raise Exception("OCV failure for %s not reported" % frame_txt)
990    if "addr=" + addr not in ev:
991        raise Exception("Unexpected OCV failure addr: " + ev)
992    if "frame=" + frame not in ev:
993        raise Exception("Unexpected OCV failure frame: " + ev)
994    if "error=primary channel mismatch" not in ev:
995        raise Exception("Unexpected OCV failure error: " + ev)
996
997def test_wpa2_ocv_ap_override_eapol_m3(dev, apdev):
998    """OCV on 2.4 GHz and AP override EAPOL-Key msg 3/4"""
999    run_wpa2_ocv_ap_override_eapol_m3(dev, apdev)
1000
1001def test_wpa2_ocv_ap_override_eapol_m3_post_enable(dev, apdev):
1002    """OCV on 2.4 GHz and AP override EAPOL-Key msg 3/4 (post enable)"""
1003    run_wpa2_ocv_ap_override_eapol_m3(dev, apdev, True)
1004
1005def run_wpa2_ocv_ap_override_eapol_m3(dev, apdev, post_enable=False):
1006    params = {"channel": "1",
1007              "ieee80211w": "2",
1008              "ocv": "1"}
1009    if not post_enable:
1010        params["oci_freq_override_eapol_m3"] = "2462"
1011    hapd, ssid, passphrase = ocv_setup_ap(apdev[0], params)
1012    bssid = hapd.own_addr()
1013    if post_enable:
1014        hapd.set("oci_freq_override_eapol_m3", "2462")
1015    dev[0].connect(ssid, psk=passphrase, scan_freq="2412", ocv="1",
1016                   ieee80211w="2", wait_connect=False)
1017
1018    check_ocv_failure(dev[0], "EAPOL-Key msg 3/4", "eapol-key-m3", bssid)
1019
1020    ev = dev[0].wait_disconnected()
1021    if "reason=15" not in ev:
1022        raise Exception("Unexpected disconnection reason: " + ev)
1023
1024def test_wpa2_ocv_ap_override_eapol_g1(dev, apdev):
1025    """OCV on 2.4 GHz and AP override EAPOL-Key group msg 1/2"""
1026    run_wpa2_ocv_ap_override_eapol_g1(dev, apdev)
1027
1028def test_wpa2_ocv_ap_override_eapol_g1_post_enable(dev, apdev):
1029    """OCV on 2.4 GHz and AP override EAPOL-Key group msg 1/2 (post enable)"""
1030    run_wpa2_ocv_ap_override_eapol_g1(dev, apdev, True)
1031
1032def run_wpa2_ocv_ap_override_eapol_g1(dev, apdev, post_enable=False):
1033    params = {"channel": "1",
1034              "ieee80211w": "2",
1035              "ocv": "1"}
1036    if not post_enable:
1037        params["oci_freq_override_eapol_g1"] = "2462"
1038    hapd, ssid, passphrase = ocv_setup_ap(apdev[0], params)
1039    bssid = hapd.own_addr()
1040    dev[0].connect(ssid, psk=passphrase, scan_freq="2412", ocv="1",
1041                   ieee80211w="2")
1042
1043    if post_enable:
1044        hapd.set("oci_freq_override_eapol_g1", "2462")
1045    if "OK" not in hapd.request("REKEY_GTK"):
1046        raise Exception("REKEY_GTK failed")
1047    check_ocv_failure(dev[0], "EAPOL-Key group msg 1/2", "eapol-key-g1", bssid)
1048
1049def test_wpa2_ocv_ap_override_saquery_req(dev, apdev):
1050    """OCV on 2.4 GHz and AP override SA Query Request"""
1051    params = {"channel": "1",
1052              "ieee80211w": "2",
1053              "ocv": "1",
1054              "oci_freq_override_saquery_req": "2462"}
1055    hapd, ssid, passphrase = ocv_setup_ap(apdev[0], params)
1056    bssid = hapd.own_addr()
1057    dev[0].connect(ssid, psk=passphrase, scan_freq="2412", ocv="1",
1058                   ieee80211w="2")
1059    hapd.wait_sta(wait_4way_hs=True)
1060
1061    if "OK" not in hapd.request("SA_QUERY " + dev[0].own_addr()):
1062        raise Exception("SA_QUERY failed")
1063    check_ocv_failure(dev[0], "SA Query Request", "saqueryreq", bssid)
1064
1065def test_wpa2_ocv_ap_override_saquery_resp(dev, apdev):
1066    """OCV on 2.4 GHz and AP override SA Query Response"""
1067    params = {"channel": "1",
1068              "ieee80211w": "2",
1069              "ocv": "1",
1070              "oci_freq_override_saquery_resp": "2462"}
1071    hapd, ssid, passphrase = ocv_setup_ap(apdev[0], params)
1072    bssid = hapd.own_addr()
1073    dev[0].connect(ssid, psk=passphrase, scan_freq="2412", ocv="1",
1074                   ieee80211w="2")
1075
1076    if "OK" not in dev[0].request("UNPROT_DEAUTH"):
1077        raise Exception("Triggering SA Query from the STA failed")
1078    check_ocv_failure(dev[0], "SA Query Response", "saqueryresp", bssid)
1079
1080def test_wpa2_ocv_ap_override_fils_assoc(dev, apdev, params):
1081    """OCV on 2.4 GHz and AP override FILS association"""
1082    run_wpa2_ocv_ap_override_fils_assoc(dev, apdev, params)
1083
1084def test_wpa2_ocv_ap_override_fils_assoc_post_enable(dev, apdev, params):
1085    """OCV on 2.4 GHz and AP override FILS association (post enable)"""
1086    run_wpa2_ocv_ap_override_fils_assoc(dev, apdev, params, True)
1087
1088def run_wpa2_ocv_ap_override_fils_assoc(dev, apdev, params, post_enable=False):
1089    check_fils_capa(dev[0])
1090    check_erp_capa(dev[0])
1091
1092    start_erp_as(msk_dump=os.path.join(params['logdir'], "msk.lst"))
1093
1094    bssid = apdev[0]['bssid']
1095    ssid = "test-wpa2-ocv"
1096    params = hostapd.wpa2_eap_params(ssid=ssid)
1097    params['wpa_key_mgmt'] = "FILS-SHA256"
1098    params['auth_server_port'] = "18128"
1099    params['erp_send_reauth_start'] = '1'
1100    params['erp_domain'] = 'example.com'
1101    params['fils_realm'] = 'example.com'
1102    params['wpa_group_rekey'] = '1'
1103    params["ieee80211w"] = "2"
1104    params["ocv"] = "1"
1105    if not post_enable:
1106        params["oci_freq_override_fils_assoc"] = "2462"
1107    try:
1108        hapd = hostapd.add_ap(apdev[0], params)
1109    except Exception as e:
1110        if "Failed to set hostapd parameter ocv" in str(e):
1111            raise HwsimSkip("OCV not supported")
1112        raise
1113    bssid = hapd.own_addr()
1114    if post_enable:
1115        hapd.set("oci_freq_override_fils_assoc", "2462")
1116    dev[0].request("ERP_FLUSH")
1117    id = dev[0].connect(ssid, key_mgmt="FILS-SHA256",
1118                        eap="PSK", identity="psk.user@example.com",
1119                        password_hex="0123456789abcdef0123456789abcdef",
1120                        erp="1", scan_freq="2412", ocv="1", ieee80211w="2")
1121
1122    dev[0].request("DISCONNECT")
1123    dev[0].wait_disconnected()
1124
1125    dev[0].dump_monitor()
1126    dev[0].select_network(id, freq=2412)
1127
1128    check_ocv_failure(dev[0], "FILS Association Response", "fils-assoc", bssid)
1129    dev[0].request("DISCONNECT")
1130
1131def test_wpa2_ocv_ap_override_ft_assoc(dev, apdev):
1132    """OCV on 2.4 GHz and AP override FT reassociation"""
1133    run_wpa2_ocv_ap_override_ft_assoc(dev, apdev)
1134
1135def test_wpa2_ocv_ap_override_ft_assoc_post_enable(dev, apdev):
1136    """OCV on 2.4 GHz and AP override FT reassociation (post enable)"""
1137    run_wpa2_ocv_ap_override_ft_assoc(dev, apdev, True)
1138
1139def run_wpa2_ocv_ap_override_ft_assoc(dev, apdev, post_enable=False):
1140    ssid = "test-wpa2-ocv"
1141    passphrase = "qwertyuiop"
1142    params = ft_params1(ssid=ssid, passphrase=passphrase)
1143    params["ieee80211w"] = "2"
1144    params["ocv"] = "1"
1145    if not post_enable:
1146        params["oci_freq_override_ft_assoc"] = "2462"
1147    try:
1148        hapd0 = hostapd.add_ap(apdev[0], params)
1149    except Exception as e:
1150        if "Failed to set hostapd parameter ocv" in str(e):
1151            raise HwsimSkip("OCV not supported")
1152        raise
1153    params = ft_params2(ssid=ssid, passphrase=passphrase)
1154    params["ieee80211w"] = "2"
1155    params["ocv"] = "1"
1156    if not post_enable:
1157        params["oci_freq_override_ft_assoc"] = "2462"
1158    hapd1 = hostapd.add_ap(apdev[1], params)
1159
1160    if post_enable:
1161        hapd0.set("oci_freq_override_ft_assoc", "2462")
1162        hapd1.set("oci_freq_override_ft_assoc", "2462")
1163
1164    dev[0].connect(ssid, key_mgmt="FT-PSK", psk=passphrase,
1165                   scan_freq="2412", ocv="1", ieee80211w="2")
1166
1167    bssid = dev[0].get_status_field("bssid")
1168    bssid0 = hapd0.own_addr()
1169    bssid1 = hapd1.own_addr()
1170    target = bssid0 if bssid == bssid1 else bssid1
1171
1172    dev[0].scan_for_bss(target, freq="2412")
1173    if "OK" not in dev[0].request("ROAM " + target):
1174        raise Exception("ROAM failed")
1175
1176    check_ocv_failure(dev[0], "FT Reassociation Response", "ft-assoc", target)
1177    dev[0].request("DISCONNECT")
1178
1179@remote_compatible
1180def test_wpa2_ocv_no_pmf(dev, apdev):
1181    """OCV on 2.4 GHz and no PMF on STA"""
1182    params = {"channel": "1",
1183              "ieee80211w": "1",
1184              "ocv": "1"}
1185    hapd, ssid, passphrase = ocv_setup_ap(apdev[0], params)
1186    ie = "301a0100000fac040100000fac040100000fac0200400000000fac06"
1187    if "OK" not in dev[0].request("TEST_ASSOC_IE " + ie):
1188        raise Exception("Could not set TEST_ASSOC_IE")
1189    dev[0].connect(ssid, psk=passphrase, scan_freq="2412", ocv="0",
1190                   ieee80211w="0", wait_connect=False)
1191    ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED", "CTRL-EVENT-ASSOC-REJECT"],
1192                           timeout=10)
1193    dev[0].request("DISCONNECT")
1194    if ev is None:
1195        raise Exception("No connection result seen")
1196    if "CTRL-EVENT-CONNECTED" in ev:
1197        raise Exception("Unexpected connection")
1198    if "status_code=31" not in ev:
1199        raise Exception("Unexpected status code: " + ev)
1200
1201@remote_compatible
1202def test_wpa2_ocv_no_pmf_workaround(dev, apdev):
1203    """OCV on 2.4 GHz and no PMF on STA with workaround"""
1204    params = {"channel": "1",
1205              "ieee80211w": "1",
1206              "ocv": "2"}
1207    hapd, ssid, passphrase = ocv_setup_ap(apdev[0], params)
1208    ie = "301a0100000fac040100000fac040100000fac0200400000000fac06"
1209    if "OK" not in dev[0].request("TEST_ASSOC_IE " + ie):
1210        raise Exception("Could not set TEST_ASSOC_IE")
1211    dev[0].connect(ssid, psk=passphrase, scan_freq="2412", ocv="0",
1212                   ieee80211w="0")
1213
1214@remote_compatible
1215def test_wpa2_ocv_no_oci(dev, apdev):
1216    """OCV on 2.4 GHz and no OCI from STA"""
1217    params = {"channel": "1",
1218              "ieee80211w": "1",
1219              "ocv": "1"}
1220    hapd, ssid, passphrase = ocv_setup_ap(apdev[0], params)
1221    ie = "301a0100000fac040100000fac040100000fac0280400000000fac06"
1222    if "OK" not in dev[0].request("TEST_ASSOC_IE " + ie):
1223        raise Exception("Could not set TEST_ASSOC_IE")
1224    dev[0].connect(ssid, psk=passphrase, scan_freq="2412", ocv="0",
1225                   ieee80211w="1", wait_connect=False)
1226    ev = hapd.wait_event(["OCV-FAILURE"], timeout=10)
1227    if ev is None:
1228        raise Exception("No OCV failure reported")
1229    if "frame=eapol-key-m2 error=did not receive mandatory OCI" not in ev:
1230        raise Exception("Unexpected error: " + ev)
1231    ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED",
1232                            "WPA: 4-Way Handshake failed"], timeout=10)
1233    dev[0].request("DISCONNECT")
1234    if "CTRL-EVENT-CONNECTED" in ev:
1235        raise Exception("Unexpected connection")
1236    if ev is None:
1237        raise Exception("4-way handshake failure not reported")
1238
1239@remote_compatible
1240def test_wpa2_ocv_no_oci_workaround(dev, apdev):
1241    """OCV on 2.4 GHz and no OCI from STA with workaround"""
1242    params = {"channel": "1",
1243              "ieee80211w": "1",
1244              "ocv": "2"}
1245    hapd, ssid, passphrase = ocv_setup_ap(apdev[0], params)
1246    ie = "301a0100000fac040100000fac040100000fac0280400000000fac06"
1247    if "OK" not in dev[0].request("TEST_ASSOC_IE " + ie):
1248        raise Exception("Could not set TEST_ASSOC_IE")
1249    dev[0].connect(ssid, psk=passphrase, scan_freq="2412", ocv="0",
1250                   ieee80211w="1")
1251
1252def test_wpa2_ocv_without_pmf(dev, apdev):
1253    """OCV without PMF"""
1254    params = {"channel": "6",
1255              "ieee80211n": "1",
1256              "ieee80211w": "1",
1257              "ocv": "1"}
1258    hapd, ssid, passphrase = ocv_setup_ap(apdev[0], params)
1259    hapd.disable()
1260    hapd.set("ieee80211w", "0")
1261    if "FAIL" not in hapd.request("ENABLE"):
1262        raise Exception("OCV without PMF accepted")
1263