1#!/usr/bin/env python 2# 3# Copyright (c) 2022, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28# 29 30import grpc 31import ipaddress 32import itertools 33import json 34import netifaces 35import select 36import socket 37import struct 38import threading 39import time 40import win32api 41import winreg as wr 42 43from GRLLibs.UtilityModules.ModuleHelper import ModuleHelper 44from GRLLibs.UtilityModules.SnifferManager import SnifferManager 45from GRLLibs.UtilityModules.TopologyManager import TopologyManager 46from Sniffer.ISniffer import ISniffer 47from THCI.OpenThread import watched 48from simulation.Sniffer.proto import sniffer_pb2 49from simulation.Sniffer.proto import sniffer_pb2_grpc 50 51DISCOVERY_ADDR = ('ff02::114', 12345) 52IFNAME = ISniffer.ethernet_interface_name 53 54SCAN_TIME = 3 55 56# `socket.IPPROTO_IPV6` only exists in Python 3, so the constant is manually defined. 57IPPROTO_IPV6 = 41 58 59# The subkey represents the class of network adapter devices supported by the system, 60# which is used to filter physical network cards and avoid virtual network adapters. 61WINREG_KEY = r'SYSTEM\CurrentControlSet\Control\Network\{4d36e972-e325-11ce-bfc1-08002be10318}' 62 63 64# When Harness requires an RF shield box, it will pop up a message box via `ModuleHelper.UIMsgBox` 65# Replace the function to add and remove an RF enclosure simulation automatically without popping up a window 66def UIMsgBoxDecorator(UIMsgBox, replaceFuncs): 67 68 @staticmethod 69 def UIMsgBoxWrapper(msg='Confirm ??', 70 title='User Input Required', 71 inputRequired=False, 72 default='', 73 choices=None, 74 timeout=None, 75 isPrompt=False): 76 func = replaceFuncs.get((msg, title)) 77 if func is None: 78 return UIMsgBox(msg, title, inputRequired, default, choices, timeout, isPrompt) 79 else: 80 return func() 81 82 return UIMsgBoxWrapper 83 84 85class DeviceManager: 86 87 def __init__(self): 88 deviceManager = TopologyManager.m_DeviceManager 89 self._devices = {'DUT': deviceManager.AutoDUTDeviceObject.THCI_Object} 90 for device_obj in deviceManager.DeviceList.values(): 91 if device_obj.IsDeviceUsed: 92 self._devices[device_obj.DeviceTopologyInfo] = device_obj.THCI_Object 93 94 def __getitem__(self, deviceName): 95 device = self._devices.get(deviceName) 96 if device is None: 97 raise KeyError('Device Name "%s" not found' % deviceName) 98 return device 99 100 101class RFEnclosureManager: 102 103 def __init__(self, deviceNames1, deviceNames2, snifferPartitionId): 104 self.deviceNames1 = deviceNames1 105 self.deviceNames2 = deviceNames2 106 self.snifferPartitionId = snifferPartitionId 107 108 def placeRFEnclosure(self): 109 devices = DeviceManager() 110 self._partition1 = [devices[role] for role in self.deviceNames1] 111 self._partition2 = [devices[role] for role in self.deviceNames2] 112 sniffer_denied_partition = self._partition1 if self.snifferPartitionId == 2 else self._partition2 113 114 for device1 in self._partition1: 115 for device2 in self._partition2: 116 device1.addBlockedNodeId(device2.node_id) 117 device2.addBlockedNodeId(device1.node_id) 118 119 for sniffer in SnifferManager.SnifferObjects.values(): 120 if sniffer.isSnifferCapturing(): 121 sniffer.filterNodes(device.node_id for device in sniffer_denied_partition) 122 123 def removeRFEnclosure(self): 124 for device in itertools.chain(self._partition1, self._partition2): 125 device.clearBlockedNodeIds() 126 127 for sniffer in SnifferManager.SnifferObjects.values(): 128 if sniffer.isSnifferCapturing(): 129 sniffer.filterNodes(()) 130 131 self._partition1 = None 132 self._partition2 = None 133 134 135class SimSniffer(ISniffer): 136 replaced_msgbox = False 137 138 @watched 139 def __init__(self, **kwargs): 140 self.channel = kwargs.get('channel') 141 self.addr_port = kwargs.get('addressofDevice') 142 self.is_active = False 143 self._local_pcapng_location = None 144 145 if self.addr_port is not None: 146 # Replace `ModuleHelper.UIMsgBox` only when simulation devices exist 147 self._replaceMsgBox() 148 149 self._sniffer = grpc.insecure_channel(self.addr_port) 150 self._stub = sniffer_pb2_grpc.SnifferStub(self._sniffer) 151 152 # Close the sniffer only when Harness exits 153 win32api.SetConsoleCtrlHandler(self.__disconnect, True) 154 155 @watched 156 def _replaceMsgBox(self): 157 # Replace the function only once 158 if SimSniffer.replaced_msgbox: 159 return 160 SimSniffer.replaced_msgbox = True 161 162 test_9_2_9_leader = RFEnclosureManager(['Router_1', 'Router_2'], ['DUT', 'Commissioner'], 1) 163 test_9_2_9_router = RFEnclosureManager(['DUT', 'Router_2'], ['Leader', 'Commissioner'], 1) 164 test_9_2_10_router = RFEnclosureManager(['Leader', 'Commissioner'], ['DUT', 'MED_1', 'SED_1'], 2) 165 166 # Alter the behavior of `ModuleHelper.UIMsgBox` only when it comes to the following test cases: 167 # - Leader 9.2.9 168 # - Router 9.2.9 169 # - Router 9.2.10 170 ModuleHelper.UIMsgBox = UIMsgBoxDecorator( 171 ModuleHelper.UIMsgBox, 172 replaceFuncs={ 173 ("Place [Router1, Router2 and Sniffer] <br/> or <br/>[DUT and Commissioner] <br/>in an RF enclosure ", "Shield Devices"): 174 test_9_2_9_leader.placeRFEnclosure, 175 ("Remove [Router1, Router2 and Sniffer] <br/> or <br/>[DUT and Commissioner] <br/>from RF enclosure ", "Unshield Devices"): 176 test_9_2_9_leader.removeRFEnclosure, 177 ("Place [DUT,Router2 and sniffer] <br/> or <br/>[Leader and Commissioner] <br/>in an RF enclosure ", "Shield Devices"): 178 test_9_2_9_router.placeRFEnclosure, 179 ("Remove [DUT, Router2 and sniffer] <br/> or <br/>[Leader and Commissioner] <br/>from RF enclosure ", "Unshield Devices"): 180 test_9_2_9_router.removeRFEnclosure, 181 ("Place the <br/> [Leader and Commissioner] devices <br/> in an RF enclosure ", "Shield Devices"): 182 test_9_2_10_router.placeRFEnclosure, 183 ("Remove <br/>[Leader and Commissioner] <br/>from RF enclosure ", "Unshield Devices"): 184 test_9_2_10_router.removeRFEnclosure, 185 }) 186 187 def __repr__(self): 188 return '%r' % self.__dict__ 189 190 def _get_connection_name_from_guid(self, iface_guids): 191 iface_names = ['(unknown)' for i in range(len(iface_guids))] 192 reg = wr.ConnectRegistry(None, wr.HKEY_LOCAL_MACHINE) 193 reg_key = wr.OpenKey(reg, WINREG_KEY) 194 for i in range(len(iface_guids)): 195 try: 196 reg_subkey = wr.OpenKey(reg_key, iface_guids[i] + r'\Connection') 197 iface_names[i] = wr.QueryValueEx(reg_subkey, 'Name')[0] 198 except Exception, e: 199 pass 200 return iface_names 201 202 def _find_index(self, iface_name): 203 ifaces_guid = netifaces.interfaces() 204 absolute_iface_name = self._get_connection_name_from_guid(ifaces_guid) 205 try: 206 _required_iface_index = absolute_iface_name.index(iface_name) 207 _required_guid = ifaces_guid[_required_iface_index] 208 ip = netifaces.ifaddresses(_required_guid)[netifaces.AF_INET6][-1]['addr'] 209 self.log('Local IP: %s', ip) 210 return int(ip.split('%')[1]) 211 except Exception, e: 212 self.log('%r', e) 213 self.log('Interface %s not found', iface_name) 214 return None 215 216 def _encode_address_port(self, addr, port): 217 port = str(port) 218 if isinstance(ipaddress.ip_address(addr), ipaddress.IPv6Address): 219 return '[' + addr + ']:' + port 220 return addr + ':' + port 221 222 @watched 223 def discoverSniffer(self): 224 sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) 225 226 # Define the output interface of the socket 227 ifn = self._find_index(IFNAME) 228 if ifn is None: 229 self.log('%s interface has not enabled IPv6.', IFNAME) 230 return [] 231 ifn = struct.pack('I', ifn) 232 sock.setsockopt(IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, ifn) 233 234 # Send the request 235 sock.sendto(('Sniffer').encode(), DISCOVERY_ADDR) 236 237 # Scan for responses 238 devs = set() 239 start = time.time() 240 while time.time() - start < SCAN_TIME: 241 if select.select([sock], [], [], 1)[0]: 242 data, _ = sock.recvfrom(1024) 243 data = json.loads(data) 244 devs.add((data['add'], data['por'])) 245 else: 246 # Re-send the request, due to unreliability of UDP especially on WLAN 247 sock.sendto(('Sniffer').encode(), DISCOVERY_ADDR) 248 249 devs = [SimSniffer(addressofDevice=self._encode_address_port(addr, port), channel=None) for addr, port in devs] 250 self.log('List of SimSniffers: %r', devs) 251 252 return devs 253 254 @watched 255 def startSniffer(self, channelToCapture, captureFileLocation, includeEthernet=False): 256 self.channel = channelToCapture 257 self._local_pcapng_location = captureFileLocation 258 259 response = self._stub.Start(sniffer_pb2.StartRequest(channel=self.channel, includeEthernet=includeEthernet)) 260 if response.status != sniffer_pb2.OK: 261 raise RuntimeError('startSniffer error: %s' % sniffer_pb2.Status.Name(response.status)) 262 263 self._thread = threading.Thread(target=self._file_sync_main_loop) 264 self._thread.setDaemon(True) 265 self._thread.start() 266 267 self.is_active = True 268 269 @watched 270 def _file_sync_main_loop(self): 271 with open(self._local_pcapng_location, 'wb') as f: 272 for response in self._stub.TransferPcapng(sniffer_pb2.TransferPcapngRequest()): 273 f.write(response.content) 274 f.flush() 275 276 @watched 277 def stopSniffer(self): 278 if not self.is_active: 279 return 280 281 response = self._stub.Stop(sniffer_pb2.StopRequest()) 282 if response.status != sniffer_pb2.OK: 283 raise RuntimeError('stopSniffer error: %s' % sniffer_pb2.Status.Name(response.status)) 284 285 self._thread.join() 286 287 self.is_active = False 288 289 @watched 290 def filterNodes(self, nodeids): 291 if not self.is_active: 292 return 293 294 request = sniffer_pb2.FilterNodesRequest() 295 request.nodeids.extend(nodeids) 296 297 response = self._stub.FilterNodes(request) 298 if response.status != sniffer_pb2.OK: 299 raise RuntimeError('filterNodes error: %s' % sniffer_pb2.Status.Name(response.status)) 300 301 def __disconnect(self, dwCtrlType): 302 if self._sniffer is not None: 303 self._sniffer.close() 304 305 @watched 306 def setChannel(self, channelToCapture): 307 self.channel = channelToCapture 308 309 @watched 310 def getChannel(self): 311 return self.channel 312 313 @watched 314 def validateFirmwareVersion(self, device): 315 return True 316 317 @watched 318 def isSnifferCapturing(self): 319 return self.is_active 320 321 @watched 322 def getSnifferAddress(self): 323 return self.addr_port 324 325 @watched 326 def globalReset(self): 327 pass 328 329 def log(self, fmt, *args): 330 try: 331 msg = fmt % args 332 print('%s - %s - %s' % ('SimSniffer', time.strftime('%b %d %H:%M:%S'), msg)) 333 except Exception: 334 pass 335