1#!/usr/bin/env python3 2# 3# Copyright (c) 2022, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28# 29 30import argparse 31from concurrent import futures 32import enum 33import fcntl 34import grpc 35import logging 36import os 37import signal 38import socket 39import subprocess 40import tempfile 41import threading 42import time 43 44import pcap_codec 45from proto import sniffer_pb2 46from proto import sniffer_pb2_grpc 47import sniffer_transport 48 49 50class CaptureState(enum.Flag): 51 NONE = 0 52 THREAD = enum.auto() 53 ETHERNET = enum.auto() 54 55 56class SnifferServicer(sniffer_pb2_grpc.Sniffer): 57 """ Class representing the Sniffing node, whose main task is listening. """ 58 59 logger = logging.getLogger('sniffer.SnifferServicer') 60 61 RECV_BUFFER_SIZE = 4096 62 TIMEOUT = 0.1 63 64 def _reset(self): 65 self._state = CaptureState.NONE 66 self._pcap = None 67 self._denied_nodeids = None 68 self._transport = None 69 self._thread = None 70 self._thread_alive.clear() 71 self._file_sync_done.clear() 72 self._tshark_proc = None 73 74 def __init__(self, max_nodes_num): 75 self._max_nodes_num = max_nodes_num 76 self._thread_alive = threading.Event() 77 self._file_sync_done = threading.Event() 78 self._nodeids_mutex = threading.Lock() # for `self._denied_nodeids` 79 self._reset() 80 81 def Start(self, request, context): 82 """ Start sniffing. """ 83 84 self.logger.debug('call Start') 85 86 # Validate and change the state 87 if self._state != CaptureState.NONE: 88 return sniffer_pb2.StartResponse(status=sniffer_pb2.OPERATION_ERROR) 89 self._state = CaptureState.THREAD 90 91 # Create a temporary named pipe 92 tempdir = tempfile.mkdtemp() 93 fifo_name = os.path.join(tempdir, 'pcap.fifo') 94 os.mkfifo(fifo_name) 95 96 cmd = ['tshark', '-i', fifo_name] 97 if request.includeEthernet: 98 self._state |= CaptureState.ETHERNET 99 cmd += ['-i', 'docker0'] 100 cmd += ['-w', '-', '-q', 'not ip and not tcp and not arp and not ether proto 0x8899'] 101 102 self.logger.debug('Running command: %s', ' '.join(cmd)) 103 self._tshark_proc = subprocess.Popen(cmd, stdout=subprocess.PIPE) 104 self._set_nonblocking(self._tshark_proc.stdout.fileno()) 105 106 # Construct pcap codec after initiating tshark to avoid blocking 107 self._pcap = pcap_codec.PcapCodec(request.channel, fifo_name) 108 109 # Sniffer all nodes in default, i.e. there is no RF enclosure 110 self._denied_nodeids = set() 111 112 # Create transport 113 transport_factory = sniffer_transport.SnifferTransportFactory() 114 self._transport = transport_factory.create_transport() 115 116 # Start the sniffer main loop thread 117 self._thread = threading.Thread(target=self._sniffer_main_loop) 118 self._thread.setDaemon(True) 119 self._transport.open() 120 self._thread_alive.set() 121 self._thread.start() 122 123 return sniffer_pb2.StartResponse(status=sniffer_pb2.OK) 124 125 def _sniffer_main_loop(self): 126 """ Sniffer main loop. """ 127 128 while self._thread_alive.is_set(): 129 try: 130 data, nodeid = self._transport.recv(self.RECV_BUFFER_SIZE, self.TIMEOUT) 131 except socket.timeout: 132 continue 133 134 with self._nodeids_mutex: 135 denied_nodeids = self._denied_nodeids 136 137 # Equivalent to RF enclosure 138 if nodeid not in denied_nodeids: 139 self._pcap.append(data) 140 141 def TransferPcapng(self, request, context): 142 """ Transfer the capture file. """ 143 144 # Validate the state 145 if self._state == CaptureState.NONE: 146 return sniffer_pb2.FilterNodesResponse(status=sniffer_pb2.OPERATION_ERROR) 147 148 # Synchronize the capture file 149 while True: 150 content = self._tshark_proc.stdout.read() 151 if content is None: 152 # Currently no captured packets 153 time.sleep(self.TIMEOUT) 154 elif content == b'': 155 # Reach EOF when tshark terminates 156 break 157 else: 158 # Forward the captured packets 159 yield sniffer_pb2.TransferPcapngResponse(content=content) 160 161 self._file_sync_done.set() 162 163 def _set_nonblocking(self, fd): 164 flags = fcntl.fcntl(fd, fcntl.F_GETFL) 165 if flags < 0: 166 raise RuntimeError('fcntl(F_GETFL) failed') 167 168 flags |= os.O_NONBLOCK 169 if fcntl.fcntl(fd, fcntl.F_SETFL, flags) < 0: 170 raise RuntimeError('fcntl(F_SETFL) failed') 171 172 def FilterNodes(self, request, context): 173 """ Only sniffer the specified nodes. """ 174 175 self.logger.debug('call FilterNodes') 176 177 # Validate the state 178 if not (self._state & CaptureState.THREAD): 179 return sniffer_pb2.FilterNodesResponse(status=sniffer_pb2.OPERATION_ERROR) 180 181 denied_nodeids = set(request.nodeids) 182 # Validate the node IDs 183 for nodeid in denied_nodeids: 184 if not 1 <= nodeid <= self._max_nodes_num: 185 return sniffer_pb2.FilterNodesResponse(status=sniffer_pb2.VALUE_ERROR) 186 187 with self._nodeids_mutex: 188 self._denied_nodeids = denied_nodeids 189 190 return sniffer_pb2.FilterNodesResponse(status=sniffer_pb2.OK) 191 192 def Stop(self, request, context): 193 """ Stop sniffing, and return the pcap bytes. """ 194 195 self.logger.debug('call Stop') 196 197 # Validate and change the state 198 if not (self._state & CaptureState.THREAD): 199 return sniffer_pb2.StopResponse(status=sniffer_pb2.OPERATION_ERROR) 200 self._state = CaptureState.NONE 201 202 self._thread_alive.clear() 203 self._thread.join() 204 self._transport.close() 205 self._pcap.close() 206 207 self._tshark_proc.terminate() 208 self._file_sync_done.wait() 209 # `self._tshark_proc` becomes None after the next statement 210 self._tshark_proc.wait() 211 212 self._reset() 213 214 return sniffer_pb2.StopResponse(status=sniffer_pb2.OK) 215 216 217def serve(address_port, max_nodes_num): 218 # One worker is used for `Start`, `FilterNodes` and `Stop` 219 # The other worker is used for `TransferPcapng`, which will be kept running by the client in a background thread 220 server = grpc.server(futures.ThreadPoolExecutor(max_workers=2)) 221 sniffer_pb2_grpc.add_SnifferServicer_to_server(SnifferServicer(max_nodes_num), server) 222 # add_secure_port requires a web domain 223 server.add_insecure_port(address_port) 224 logging.info('server starts on %s', address_port) 225 server.start() 226 227 def exit_handler(signum, context): 228 server.stop(1) 229 230 signal.signal(signal.SIGINT, exit_handler) 231 signal.signal(signal.SIGTERM, exit_handler) 232 233 server.wait_for_termination() 234 235 236def run_sniffer(): 237 logging.basicConfig(level=logging.INFO) 238 239 parser = argparse.ArgumentParser() 240 parser.add_argument('--grpc-server', 241 dest='grpc_server', 242 type=str, 243 required=True, 244 help='the address of the sniffer server') 245 parser.add_argument('--max-nodes-num', 246 dest='max_nodes_num', 247 type=int, 248 required=True, 249 help='the maximum number of nodes') 250 args = parser.parse_args() 251 252 serve(args.grpc_server, args.max_nodes_num) 253 254 255if __name__ == '__main__': 256 run_sniffer() 257