1# Test a few kernel bugs and functionality
2# Copyright (c) 2016, Intel Deutschland GmbH
3#
4# Author: Johannes Berg <johannes.berg@intel.com>
5#
6# This software may be distributed under the terms of the BSD license.
7# See README for more details.
8
9import hostapd
10import binascii
11import os, time
12import struct
13import subprocess, re
14from test_wnm import expect_ack
15from tshark import run_tshark
16from utils import clear_regdom, long_duration_test
17from utils import HwsimSkip
18
19def _test_kernel_bss_leak(dev, apdev, deauth):
20    ssid = "test-bss-leak"
21    passphrase = 'qwertyuiop'
22    params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
23    hapd = hostapd.add_ap(apdev[0], params)
24    hapd.set("ext_mgmt_frame_handling", "1")
25    dev[0].connect(ssid, psk=passphrase, scan_freq="2412", wait_connect=False)
26    while True:
27        pkt = hapd.mgmt_rx()
28        if not pkt:
29            raise Exception("MGMT RX wait timed out for auth frame")
30        if pkt['fc'] & 0xc:
31            continue
32        if pkt['subtype'] == 0: # assoc request
33            if deauth:
34                # return a deauth immediately
35                hapd.mgmt_tx({
36                    'fc': 0xc0,
37                    'sa': pkt['da'],
38                    'da': pkt['sa'],
39                    'bssid': pkt['bssid'],
40                    'payload': b'\x01\x00',
41                })
42            break
43        else:
44            hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % (
45                         binascii.hexlify(pkt['frame']).decode(), ))
46    hapd.set("ext_mgmt_frame_handling", "0")
47
48    hapd.request("STOP_AP")
49
50    dev[0].request("REMOVE_NETWORK all")
51    dev[0].wait_disconnected()
52
53    dev[0].flush_scan_cache(freq=5180)
54    res = dev[0].request("SCAN_RESULTS")
55    if len(res.splitlines()) > 1:
56        raise Exception("BSS entry should no longer be around")
57
58def test_kernel_bss_leak_deauth(dev, apdev):
59    """cfg80211/mac80211 BSS leak on deauthentication"""
60    return _test_kernel_bss_leak(dev, apdev, deauth=True)
61
62def test_kernel_bss_leak_timeout(dev, apdev):
63    """cfg80211/mac80211 BSS leak on timeout"""
64    return _test_kernel_bss_leak(dev, apdev, deauth=False)
65
66MGMT_SUBTYPE_ACTION = 13
67
68def expect_no_ack(hapd):
69    ev = hapd.wait_event(["MGMT-TX-STATUS"], timeout=5)
70    if ev is None:
71        raise Exception("Missing TX status")
72    if "ok=0" not in ev:
73        raise Exception("Action frame unexpectedly acknowledged")
74
75def test_kernel_unknown_action_frame_rejection_sta(dev, apdev, params):
76    """mac80211 and unknown Action frame rejection in STA mode"""
77    hapd = hostapd.add_ap(apdev[0], {"ssid": "unknown-action"})
78    dev[0].connect("unknown-action", key_mgmt="NONE", scan_freq="2412")
79    bssid = hapd.own_addr()
80    addr = dev[0].own_addr()
81
82    hapd.set("ext_mgmt_frame_handling", "1")
83
84    # Unicast Action frame with unknown category (response expected)
85    msg = {}
86    msg['fc'] = MGMT_SUBTYPE_ACTION << 4
87    msg['da'] = addr
88    msg['sa'] = bssid
89    msg['bssid'] = bssid
90    msg['payload'] = struct.pack("<BB", 0x70, 0)
91    hapd.mgmt_tx(msg)
92    expect_ack(hapd)
93
94    # Note: mac80211 does not allow group-addressed Action frames in unknown
95    # categories to be transmitted in AP mode, so for now, these steps are
96    # commented out.
97
98    # Multicast Action frame with unknown category (no response expected)
99    #msg['da'] = "01:ff:ff:ff:ff:ff"
100    #msg['payload'] = struct.pack("<BB", 0x71, 1)
101    #hapd.mgmt_tx(msg)
102    #expect_no_ack(hapd)
103
104    # Broadcast Action frame with unknown category (no response expected)
105    #msg['da'] = "ff:ff:ff:ff:ff:ff"
106    #msg['payload'] = struct.pack("<BB", 0x72, 2)
107    #hapd.mgmt_tx(msg)
108    #expect_no_ack(hapd)
109
110    # Unicast Action frame with error indication category (no response expected)
111    msg['da'] = addr
112    msg['payload'] = struct.pack("<BB", 0xf3, 3)
113    hapd.mgmt_tx(msg)
114    expect_ack(hapd)
115
116    # Unicast Action frame with unknown category (response expected)
117    msg['da'] = addr
118    msg['payload'] = struct.pack("<BB", 0x74, 4)
119    hapd.mgmt_tx(msg)
120    expect_ack(hapd)
121
122    out = run_tshark(os.path.join(params['logdir'], "hwsim0.pcapng"),
123                     "wlan.sa == %s && wlan.fc.type_subtype == 0x0d" % addr,
124                     display=["wlan_mgt.fixed.category_code"])
125    res = out.splitlines()
126    categ = [int(x) for x in res]
127
128    if 0xf2 in categ or 0xf3 in categ:
129        raise Exception("Unexpected Action frame rejection: " + str(categ))
130    if 0xf0 not in categ or 0xf4 not in categ:
131        raise Exception("Action frame rejection missing: " + str(categ))
132
133@long_duration_test
134def test_kernel_reg_disconnect(dev, apdev):
135    """Connect and force disconnect via regulatory"""
136    hapd = None
137    try:
138        ssid = "test-reg-disconnect"
139        passphrase = 'qwertyuiop'
140        params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
141        params["country_code"] = "DE"
142        params["hw_mode"] = "b"
143        params["channel"] = "13"
144        hapd = hostapd.add_ap(apdev[0], params)
145        dev[0].set("country", "DE")
146        dev[0].connect(ssid, psk=passphrase, scan_freq="2472")
147        dev[0].set("country", "US")
148        time.sleep(61)
149        dev[0].wait_disconnected(error="no regulatory disconnect")
150    finally:
151        dev[0].request("DISCONNECT")
152        clear_regdom(hapd, dev)
153        dev[0].set("country", "00")
154
155def test_kernel_kunit(dev, apdev):
156    """KUnit tests"""
157    modules = ('cfg80211-tests', 'mac80211-tests')
158    results = (subprocess.call(['modprobe', mod], stderr=subprocess.PIPE)
159               for mod in modules)
160
161    if all((r != 0 for r in results)):
162        raise HwsimSkip("KUnit tests not available")
163
164    dmesg = subprocess.check_output(['dmesg'])
165    fail_rx = re.compile(b'fail:[^0]')
166    for line in dmesg.split(b'\n'):
167        if fail_rx.search(line):
168            raise Exception(f'kunit test failed: {line.decode("utf-8")}')
169