1# Copyright 2018 Espressif Systems (Shanghai) PTE LTD
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15
16# APIs for interpreting and creating protobuf packets for Wi-Fi Scanning
17
18from __future__ import print_function
19
20import proto
21import utils
22from future.utils import tobytes
23
24
25def print_verbose(security_ctx, data):
26    if (security_ctx.verbose):
27        print('++++ ' + data + ' ++++')
28
29
30def scan_start_request(security_ctx, blocking=True, passive=False, group_channels=5, period_ms=120):
31    # Form protobuf request packet for ScanStart command
32    cmd = proto.wifi_scan_pb2.WiFiScanPayload()
33    cmd.msg = proto.wifi_scan_pb2.TypeCmdScanStart
34    cmd.cmd_scan_start.blocking = blocking
35    cmd.cmd_scan_start.passive = passive
36    cmd.cmd_scan_start.group_channels = group_channels
37    cmd.cmd_scan_start.period_ms = period_ms
38    enc_cmd = security_ctx.encrypt_data(cmd.SerializeToString()).decode('latin-1')
39    print_verbose(security_ctx, 'Client -> Device (Encrypted CmdScanStart) ' + utils.str_to_hexstr(enc_cmd))
40    return enc_cmd
41
42
43def scan_start_response(security_ctx, response_data):
44    # Interpret protobuf response packet from ScanStart command
45    dec_resp = security_ctx.decrypt_data(tobytes(response_data))
46    resp = proto.wifi_scan_pb2.WiFiScanPayload()
47    resp.ParseFromString(dec_resp)
48    print_verbose(security_ctx, 'ScanStart status ' + str(resp.status))
49    if resp.status != 0:
50        raise RuntimeError
51
52
53def scan_status_request(security_ctx):
54    # Form protobuf request packet for ScanStatus command
55    cmd = proto.wifi_scan_pb2.WiFiScanPayload()
56    cmd.msg = proto.wifi_scan_pb2.TypeCmdScanStatus
57    enc_cmd = security_ctx.encrypt_data(cmd.SerializeToString()).decode('latin-1')
58    print_verbose(security_ctx, 'Client -> Device (Encrypted CmdScanStatus) ' + utils.str_to_hexstr(enc_cmd))
59    return enc_cmd
60
61
62def scan_status_response(security_ctx, response_data):
63    # Interpret protobuf response packet from ScanStatus command
64    dec_resp = security_ctx.decrypt_data(tobytes(response_data))
65    resp = proto.wifi_scan_pb2.WiFiScanPayload()
66    resp.ParseFromString(dec_resp)
67    print_verbose(security_ctx, 'ScanStatus status ' + str(resp.status))
68    if resp.status != 0:
69        raise RuntimeError
70    return {'finished': resp.resp_scan_status.scan_finished, 'count': resp.resp_scan_status.result_count}
71
72
73def scan_result_request(security_ctx, index, count):
74    # Form protobuf request packet for ScanResult command
75    cmd = proto.wifi_scan_pb2.WiFiScanPayload()
76    cmd.msg = proto.wifi_scan_pb2.TypeCmdScanResult
77    cmd.cmd_scan_result.start_index = index
78    cmd.cmd_scan_result.count = count
79    enc_cmd = security_ctx.encrypt_data(cmd.SerializeToString()).decode('latin-1')
80    print_verbose(security_ctx, 'Client -> Device (Encrypted CmdScanResult) ' + utils.str_to_hexstr(enc_cmd))
81    return enc_cmd
82
83
84def scan_result_response(security_ctx, response_data):
85    # Interpret protobuf response packet from ScanResult command
86    dec_resp = security_ctx.decrypt_data(tobytes(response_data))
87    resp = proto.wifi_scan_pb2.WiFiScanPayload()
88    resp.ParseFromString(dec_resp)
89    print_verbose(security_ctx, 'ScanResult status ' + str(resp.status))
90    if resp.status != 0:
91        raise RuntimeError
92    authmode_str = ['Open', 'WEP', 'WPA_PSK', 'WPA2_PSK', 'WPA_WPA2_PSK', 'WPA2_ENTERPRISE']
93    results = []
94    for entry in resp.resp_scan_result.entries:
95        results += [{'ssid': entry.ssid.decode('latin-1').rstrip('\x00'),
96                     'bssid': utils.str_to_hexstr(entry.bssid.decode('latin-1')),
97                     'channel': entry.channel,
98                     'rssi': entry.rssi,
99                     'auth': authmode_str[entry.auth]}]
100        print_verbose(security_ctx, 'ScanResult SSID    : ' + str(results[-1]['ssid']))
101        print_verbose(security_ctx, 'ScanResult BSSID   : ' + str(results[-1]['bssid']))
102        print_verbose(security_ctx, 'ScanResult Channel : ' + str(results[-1]['channel']))
103        print_verbose(security_ctx, 'ScanResult RSSI    : ' + str(results[-1]['rssi']))
104        print_verbose(security_ctx, 'ScanResult AUTH    : ' + str(results[-1]['auth']))
105    return results
106