1#!/usr/bin/env python 2# 3# Copyright (c) 2019, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28''''OpenThread Sniffer API implementation''' 29 30import os 31import subprocess 32from GRLLibs.UtilityModules.ModuleHelper import ModuleHelper 33from ISniffer import ISniffer 34 35 36class OT_Sniffer(ISniffer): 37 38 def __init__(self, **kwargs): 39 try: 40 self.channel = kwargs.get('channel', 11) 41 self.port = kwargs.get('addressofDevice') 42 self.subprocess = None 43 self.is_active = False 44 45 except Exception as e: 46 ModuleHelper.WriteIntoDebugLogger('OT_Sniffer: [initialize] --> ' + str(e)) 47 48 def discoverSniffer(self): 49 sniffers = [] 50 51 p_discover = subprocess.Popen('extcap_ot.bat --extcap-interfaces', stdout=subprocess.PIPE, shell=True) 52 for line in p_discover.stdout.readlines(): 53 if line.startswith('interface'): 54 try: 55 # e.g. interface {value=COM10:460800}{display=OpenThread Sniffer COM10} 56 interface_port = line[line.index('value=') + 6:line.index('}{display')] 57 sniffers.append(OT_Sniffer(addressofDevice=interface_port, channel=ModuleHelper.Default_Channel)) 58 except Exception as e: 59 ModuleHelper.WriteIntoDebugLogger('OT_Sniffer: [discoverSniffer] --> Error: ' + str(e)) 60 61 p_discover.wait() 62 return sniffers 63 64 def startSniffer(self, channelToCapture, captureFileLocation): 65 """ 66 Method for starting the sniffer capture on given channel and this should create wireshark 'pcapng' file at the 67 given location. Capture should happen in background so that method call will be non-blocking and asynchronous. 68 @param channelToCapture : int : channel number to start the capture 69 @param captureFileLocation : string : Full path with the filename with extension is passed. 70 """ 71 try: 72 # start sniffer 73 self.setChannel(channelToCapture) 74 p_where = subprocess.Popen( 75 'py -3 -c "import sys; print(sys.executable)"', 76 stdout=subprocess.PIPE, 77 shell=True, 78 ) 79 # python_exe: e.g. C:\Python37\python.exe 80 python_exe = p_where.stdout.readline().strip() 81 82 if python_exe.endswith(".exe"): 83 # sniffer_py: e.g. C:\Python37\Scripts\sniffer.py 84 sniffer_py = str(os.path.dirname(python_exe)) + '\\Scripts\\sniffer.py' 85 86 cmd = [ 87 python_exe, 88 sniffer_py, 89 '-c', 90 str(self.channel), 91 '-u', 92 str(self.port.split(':')[0]), 93 '-b', 94 str(self.port.split(':')[1]), 95 '--crc', 96 '-o', 97 captureFileLocation, 98 ] 99 self.is_active = True 100 ModuleHelper.WriteIntoDebugLogger('OT_Sniffer: [cmd] --> %s' % str(cmd)) 101 self.subprocess = subprocess.Popen(cmd) 102 103 except Exception as e: 104 ModuleHelper.WriteIntoDebugLogger('OT_Sniffer: [startSniffer] --> Error: ' + str(e)) 105 106 def stopSniffer(self): 107 """ 108 Method for ending the sniffer capture. 109 Should stop background capturing, No further file I/O in capture file. 110 """ 111 if self.is_active: 112 self.is_active = False 113 if self.subprocess: 114 self.subprocess.terminate() 115 self.subprocess.wait() 116 117 def setChannel(self, channelToCapture): 118 """ 119 Method for changing sniffer capture 120 @param channelToCapture : int : 121 channel number is passed to change the channel which is set during the constructor call. 122 """ 123 self.channel = channelToCapture 124 125 def getChannel(self): 126 """ 127 Method to query sniffer for the channel it is sniffing on 128 @return : int : current capture channel of this sniffer instance. 129 """ 130 return self.channel 131 132 def validateFirmwareVersion(self, addressofDevice): 133 """ 134 Method to query sniffer firmware version details. 135 Shall be used while discoverSniffer() to validate the sniffer firmware. 136 @param addressofDevice : string : 137 serial com port or IP address,shall be None if need to verify own fireware version. 138 @return : bool : True if expected firmware is found , False if not 139 """ 140 return True 141 142 def isSnifferCapturing(self): 143 """ 144 method that will return true when sniffer device is capturing 145 @return : bool 146 """ 147 return self.is_active 148 149 def getSnifferAddress(self): 150 """ 151 Method to return the current sniffer's COM/IP address 152 @return : string 153 """ 154 return self.port 155 156 def globalReset(self): 157 """Method to reset all the global and class varibaled""" 158 pass 159