1#!/usr/bin/env python
2#
3# Copyright (c) 2022, The OpenThread Authors.
4# All rights reserved.
5#
6# Redistribution and use in source and binary forms, with or without
7# modification, are permitted provided that the following conditions are met:
8# 1. Redistributions of source code must retain the above copyright
9#    notice, this list of conditions and the following disclaimer.
10# 2. Redistributions in binary form must reproduce the above copyright
11#    notice, this list of conditions and the following disclaimer in the
12#    documentation and/or other materials provided with the distribution.
13# 3. Neither the name of the copyright holder nor the
14#    names of its contributors may be used to endorse or promote products
15#    derived from this software without specific prior written permission.
16#
17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27# POSSIBILITY OF SUCH DAMAGE.
28#
29
30import grpc
31import ipaddress
32import itertools
33import json
34import netifaces
35import select
36import socket
37import struct
38import threading
39import time
40import win32api
41import winreg as wr
42
43from GRLLibs.UtilityModules.ModuleHelper import ModuleHelper
44from GRLLibs.UtilityModules.SnifferManager import SnifferManager
45from GRLLibs.UtilityModules.TopologyManager import TopologyManager
46from Sniffer.ISniffer import ISniffer
47from THCI.OpenThread import watched
48from simulation.Sniffer.proto import sniffer_pb2
49from simulation.Sniffer.proto import sniffer_pb2_grpc
50
51DISCOVERY_ADDR = ('ff02::114', 12345)
52IFNAME = ISniffer.ethernet_interface_name
53
54SCAN_TIME = 3
55
56# `socket.IPPROTO_IPV6` only exists in Python 3, so the constant is manually defined.
57IPPROTO_IPV6 = 41
58
59# The subkey represents the class of network adapter devices supported by the system,
60# which is used to filter physical network cards and avoid virtual network adapters.
61WINREG_KEY = r'SYSTEM\CurrentControlSet\Control\Network\{4d36e972-e325-11ce-bfc1-08002be10318}'
62
63
64# When Harness requires an RF shield box, it will pop up a message box via `ModuleHelper.UIMsgBox`
65# Replace the function to add and remove an RF enclosure simulation automatically without popping up a window
66def UIMsgBoxDecorator(UIMsgBox, replaceFuncs):
67
68    @staticmethod
69    def UIMsgBoxWrapper(msg='Confirm ??',
70                        title='User Input Required',
71                        inputRequired=False,
72                        default='',
73                        choices=None,
74                        timeout=None,
75                        isPrompt=False):
76        func = replaceFuncs.get((msg, title))
77        if func is None:
78            return UIMsgBox(msg, title, inputRequired, default, choices, timeout, isPrompt)
79        else:
80            return func()
81
82    return UIMsgBoxWrapper
83
84
85class DeviceManager:
86
87    def __init__(self):
88        deviceManager = TopologyManager.m_DeviceManager
89        self._devices = {'DUT': deviceManager.AutoDUTDeviceObject.THCI_Object}
90        for device_obj in deviceManager.DeviceList.values():
91            if device_obj.IsDeviceUsed:
92                self._devices[device_obj.DeviceTopologyInfo] = device_obj.THCI_Object
93
94    def __getitem__(self, deviceName):
95        device = self._devices.get(deviceName)
96        if device is None:
97            raise KeyError('Device Name "%s" not found' % deviceName)
98        return device
99
100
101class RFEnclosureManager:
102
103    def __init__(self, deviceNames1, deviceNames2, snifferPartitionId):
104        self.deviceNames1 = deviceNames1
105        self.deviceNames2 = deviceNames2
106        self.snifferPartitionId = snifferPartitionId
107
108    def placeRFEnclosure(self):
109        devices = DeviceManager()
110        self._partition1 = [devices[role] for role in self.deviceNames1]
111        self._partition2 = [devices[role] for role in self.deviceNames2]
112        sniffer_denied_partition = self._partition1 if self.snifferPartitionId == 2 else self._partition2
113
114        for device1 in self._partition1:
115            for device2 in self._partition2:
116                device1.addBlockedNodeId(device2.node_id)
117                device2.addBlockedNodeId(device1.node_id)
118
119        for sniffer in SnifferManager.SnifferObjects.values():
120            if sniffer.isSnifferCapturing():
121                sniffer.filterNodes(device.node_id for device in sniffer_denied_partition)
122
123    def removeRFEnclosure(self):
124        for device in itertools.chain(self._partition1, self._partition2):
125            device.clearBlockedNodeIds()
126
127        for sniffer in SnifferManager.SnifferObjects.values():
128            if sniffer.isSnifferCapturing():
129                sniffer.filterNodes(())
130
131        self._partition1 = None
132        self._partition2 = None
133
134
135class SimSniffer(ISniffer):
136    replaced_msgbox = False
137
138    @watched
139    def __init__(self, **kwargs):
140        self.channel = kwargs.get('channel')
141        self.addr_port = kwargs.get('addressofDevice')
142        self.is_active = False
143        self._local_pcapng_location = None
144
145        if self.addr_port is not None:
146            # Replace `ModuleHelper.UIMsgBox` only when simulation devices exist
147            self._replaceMsgBox()
148
149            self._sniffer = grpc.insecure_channel(self.addr_port)
150            self._stub = sniffer_pb2_grpc.SnifferStub(self._sniffer)
151
152            # Close the sniffer only when Harness exits
153            win32api.SetConsoleCtrlHandler(self.__disconnect, True)
154
155    @watched
156    def _replaceMsgBox(self):
157        # Replace the function only once
158        if SimSniffer.replaced_msgbox:
159            return
160        SimSniffer.replaced_msgbox = True
161
162        test_9_2_9_leader = RFEnclosureManager(['Router_1', 'Router_2'], ['DUT', 'Commissioner'], 1)
163        test_9_2_9_router = RFEnclosureManager(['DUT', 'Router_2'], ['Leader', 'Commissioner'], 1)
164        test_9_2_10_router = RFEnclosureManager(['Leader', 'Commissioner'], ['DUT', 'MED_1', 'SED_1'], 2)
165
166        # Alter the behavior of `ModuleHelper.UIMsgBox` only when it comes to the following test cases:
167        #   - Leader 9.2.9
168        #   - Router 9.2.9
169        #   - Router 9.2.10
170        ModuleHelper.UIMsgBox = UIMsgBoxDecorator(
171            ModuleHelper.UIMsgBox,
172            replaceFuncs={
173                ("Place [Router1, Router2 and Sniffer] <br/> or <br/>[DUT and Commissioner] <br/>in an RF enclosure ", "Shield Devices"):
174                    test_9_2_9_leader.placeRFEnclosure,
175                ("Remove [Router1, Router2 and Sniffer] <br/> or <br/>[DUT and Commissioner] <br/>from RF enclosure ", "Unshield Devices"):
176                    test_9_2_9_leader.removeRFEnclosure,
177                ("Place [DUT,Router2 and sniffer] <br/> or <br/>[Leader and Commissioner] <br/>in an RF enclosure ", "Shield Devices"):
178                    test_9_2_9_router.placeRFEnclosure,
179                ("Remove [DUT, Router2 and sniffer] <br/> or <br/>[Leader and Commissioner] <br/>from RF enclosure ", "Unshield Devices"):
180                    test_9_2_9_router.removeRFEnclosure,
181                ("Place the <br/> [Leader and Commissioner] devices <br/> in an RF enclosure ", "Shield Devices"):
182                    test_9_2_10_router.placeRFEnclosure,
183                ("Remove <br/>[Leader and Commissioner] <br/>from RF enclosure ", "Unshield Devices"):
184                    test_9_2_10_router.removeRFEnclosure,
185            })
186
187    def __repr__(self):
188        return '%r' % self.__dict__
189
190    def _get_connection_name_from_guid(self, iface_guids):
191        iface_names = ['(unknown)' for i in range(len(iface_guids))]
192        reg = wr.ConnectRegistry(None, wr.HKEY_LOCAL_MACHINE)
193        reg_key = wr.OpenKey(reg, WINREG_KEY)
194        for i in range(len(iface_guids)):
195            try:
196                reg_subkey = wr.OpenKey(reg_key, iface_guids[i] + r'\Connection')
197                iface_names[i] = wr.QueryValueEx(reg_subkey, 'Name')[0]
198            except Exception, e:
199                pass
200        return iface_names
201
202    def _find_index(self, iface_name):
203        ifaces_guid = netifaces.interfaces()
204        absolute_iface_name = self._get_connection_name_from_guid(ifaces_guid)
205        try:
206            _required_iface_index = absolute_iface_name.index(iface_name)
207            _required_guid = ifaces_guid[_required_iface_index]
208            ip = netifaces.ifaddresses(_required_guid)[netifaces.AF_INET6][-1]['addr']
209            self.log('Local IP: %s', ip)
210            return int(ip.split('%')[1])
211        except Exception, e:
212            self.log('%r', e)
213            self.log('Interface %s not found', iface_name)
214            return None
215
216    def _encode_address_port(self, addr, port):
217        port = str(port)
218        if isinstance(ipaddress.ip_address(addr), ipaddress.IPv6Address):
219            return '[' + addr + ']:' + port
220        return addr + ':' + port
221
222    @watched
223    def discoverSniffer(self):
224        sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
225
226        # Define the output interface of the socket
227        ifn = self._find_index(IFNAME)
228        if ifn is None:
229            self.log('%s interface has not enabled IPv6.', IFNAME)
230            return []
231        ifn = struct.pack('I', ifn)
232        sock.setsockopt(IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, ifn)
233
234        # Send the request
235        sock.sendto(('Sniffer').encode(), DISCOVERY_ADDR)
236
237        # Scan for responses
238        devs = set()
239        start = time.time()
240        while time.time() - start < SCAN_TIME:
241            if select.select([sock], [], [], 1)[0]:
242                data, _ = sock.recvfrom(1024)
243                data = json.loads(data)
244                devs.add((data['add'], data['por']))
245            else:
246                # Re-send the request, due to unreliability of UDP especially on WLAN
247                sock.sendto(('Sniffer').encode(), DISCOVERY_ADDR)
248
249        devs = [SimSniffer(addressofDevice=self._encode_address_port(addr, port), channel=None) for addr, port in devs]
250        self.log('List of SimSniffers: %r', devs)
251
252        return devs
253
254    @watched
255    def startSniffer(self, channelToCapture, captureFileLocation, includeEthernet=False):
256        self.channel = channelToCapture
257        self._local_pcapng_location = captureFileLocation
258
259        response = self._stub.Start(sniffer_pb2.StartRequest(channel=self.channel, includeEthernet=includeEthernet))
260        if response.status != sniffer_pb2.OK:
261            raise RuntimeError('startSniffer error: %s' % sniffer_pb2.Status.Name(response.status))
262
263        self._thread = threading.Thread(target=self._file_sync_main_loop)
264        self._thread.setDaemon(True)
265        self._thread.start()
266
267        self.is_active = True
268
269    @watched
270    def _file_sync_main_loop(self):
271        with open(self._local_pcapng_location, 'wb') as f:
272            for response in self._stub.TransferPcapng(sniffer_pb2.TransferPcapngRequest()):
273                f.write(response.content)
274                f.flush()
275
276    @watched
277    def stopSniffer(self):
278        if not self.is_active:
279            return
280
281        response = self._stub.Stop(sniffer_pb2.StopRequest())
282        if response.status != sniffer_pb2.OK:
283            raise RuntimeError('stopSniffer error: %s' % sniffer_pb2.Status.Name(response.status))
284
285        self._thread.join()
286
287        self.is_active = False
288
289    @watched
290    def filterNodes(self, nodeids):
291        if not self.is_active:
292            return
293
294        request = sniffer_pb2.FilterNodesRequest()
295        request.nodeids.extend(nodeids)
296
297        response = self._stub.FilterNodes(request)
298        if response.status != sniffer_pb2.OK:
299            raise RuntimeError('filterNodes error: %s' % sniffer_pb2.Status.Name(response.status))
300
301    def __disconnect(self, dwCtrlType):
302        if self._sniffer is not None:
303            self._sniffer.close()
304
305    @watched
306    def setChannel(self, channelToCapture):
307        self.channel = channelToCapture
308
309    @watched
310    def getChannel(self):
311        return self.channel
312
313    @watched
314    def validateFirmwareVersion(self, device):
315        return True
316
317    @watched
318    def isSnifferCapturing(self):
319        return self.is_active
320
321    @watched
322    def getSnifferAddress(self):
323        return self.addr_port
324
325    @watched
326    def globalReset(self):
327        pass
328
329    def log(self, fmt, *args):
330        try:
331            msg = fmt % args
332            print('%s - %s - %s' % ('SimSniffer', time.strftime('%b %d %H:%M:%S'), msg))
333        except Exception:
334            pass
335