1#!/usr/bin/env python3
2#
3#  Copyright (c) 2022, The OpenThread Authors.
4#  All rights reserved.
5#
6#  Redistribution and use in source and binary forms, with or without
7#  modification, are permitted provided that the following conditions are met:
8#  1. Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#  2. Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#  3. Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27#  POSSIBILITY OF SUCH DAMAGE.
28#
29
30import argparse
31from concurrent import futures
32import enum
33import fcntl
34import grpc
35import logging
36import os
37import signal
38import socket
39import subprocess
40import tempfile
41import threading
42import time
43
44import pcap_codec
45from proto import sniffer_pb2
46from proto import sniffer_pb2_grpc
47import sniffer_transport
48
49
50class CaptureState(enum.Flag):
51    NONE = 0
52    THREAD = enum.auto()
53    ETHERNET = enum.auto()
54
55
56class SnifferServicer(sniffer_pb2_grpc.Sniffer):
57    """ Class representing the Sniffing node, whose main task is listening. """
58
59    logger = logging.getLogger('sniffer.SnifferServicer')
60
61    RECV_BUFFER_SIZE = 4096
62    TIMEOUT = 0.1
63
64    def _reset(self):
65        self._state = CaptureState.NONE
66        self._pcap = None
67        self._denied_nodeids = None
68        self._transport = None
69        self._thread = None
70        self._thread_alive.clear()
71        self._file_sync_done.clear()
72        self._tshark_proc = None
73
74    def __init__(self, max_nodes_num):
75        self._max_nodes_num = max_nodes_num
76        self._thread_alive = threading.Event()
77        self._file_sync_done = threading.Event()
78        self._nodeids_mutex = threading.Lock()  # for `self._denied_nodeids`
79        self._reset()
80
81    def Start(self, request, context):
82        """ Start sniffing. """
83
84        self.logger.debug('call Start')
85
86        # Validate and change the state
87        if self._state != CaptureState.NONE:
88            return sniffer_pb2.StartResponse(status=sniffer_pb2.OPERATION_ERROR)
89        self._state = CaptureState.THREAD
90
91        # Create a temporary named pipe
92        tempdir = tempfile.mkdtemp()
93        fifo_name = os.path.join(tempdir, 'pcap.fifo')
94        os.mkfifo(fifo_name)
95
96        cmd = ['tshark', '-i', fifo_name]
97        if request.includeEthernet:
98            self._state |= CaptureState.ETHERNET
99            cmd += ['-i', 'docker0']
100        cmd += ['-w', '-', '-q', 'not ip and not tcp and not arp and not ether proto 0x8899']
101
102        self.logger.debug('Running command:  %s', ' '.join(cmd))
103        self._tshark_proc = subprocess.Popen(cmd, stdout=subprocess.PIPE)
104        self._set_nonblocking(self._tshark_proc.stdout.fileno())
105
106        # Construct pcap codec after initiating tshark to avoid blocking
107        self._pcap = pcap_codec.PcapCodec(request.channel, fifo_name)
108
109        # Sniffer all nodes in default, i.e. there is no RF enclosure
110        self._denied_nodeids = set()
111
112        # Create transport
113        transport_factory = sniffer_transport.SnifferTransportFactory()
114        self._transport = transport_factory.create_transport()
115
116        # Start the sniffer main loop thread
117        self._thread = threading.Thread(target=self._sniffer_main_loop)
118        self._thread.setDaemon(True)
119        self._transport.open()
120        self._thread_alive.set()
121        self._thread.start()
122
123        return sniffer_pb2.StartResponse(status=sniffer_pb2.OK)
124
125    def _sniffer_main_loop(self):
126        """ Sniffer main loop. """
127
128        while self._thread_alive.is_set():
129            try:
130                data, nodeid = self._transport.recv(self.RECV_BUFFER_SIZE, self.TIMEOUT)
131            except socket.timeout:
132                continue
133
134            with self._nodeids_mutex:
135                denied_nodeids = self._denied_nodeids
136
137            # Equivalent to RF enclosure
138            if nodeid not in denied_nodeids:
139                self._pcap.append(data)
140
141    def TransferPcapng(self, request, context):
142        """ Transfer the capture file. """
143
144        # Validate the state
145        if self._state == CaptureState.NONE:
146            return sniffer_pb2.FilterNodesResponse(status=sniffer_pb2.OPERATION_ERROR)
147
148        # Synchronize the capture file
149        while True:
150            content = self._tshark_proc.stdout.read()
151            if content is None:
152                # Currently no captured packets
153                time.sleep(self.TIMEOUT)
154            elif content == b'':
155                # Reach EOF when tshark terminates
156                break
157            else:
158                # Forward the captured packets
159                yield sniffer_pb2.TransferPcapngResponse(content=content)
160
161        self._file_sync_done.set()
162
163    def _set_nonblocking(self, fd):
164        flags = fcntl.fcntl(fd, fcntl.F_GETFL)
165        if flags < 0:
166            raise RuntimeError('fcntl(F_GETFL) failed')
167
168        flags |= os.O_NONBLOCK
169        if fcntl.fcntl(fd, fcntl.F_SETFL, flags) < 0:
170            raise RuntimeError('fcntl(F_SETFL) failed')
171
172    def FilterNodes(self, request, context):
173        """ Only sniffer the specified nodes. """
174
175        self.logger.debug('call FilterNodes')
176
177        # Validate the state
178        if not (self._state & CaptureState.THREAD):
179            return sniffer_pb2.FilterNodesResponse(status=sniffer_pb2.OPERATION_ERROR)
180
181        denied_nodeids = set(request.nodeids)
182        # Validate the node IDs
183        for nodeid in denied_nodeids:
184            if not 1 <= nodeid <= self._max_nodes_num:
185                return sniffer_pb2.FilterNodesResponse(status=sniffer_pb2.VALUE_ERROR)
186
187        with self._nodeids_mutex:
188            self._denied_nodeids = denied_nodeids
189
190        return sniffer_pb2.FilterNodesResponse(status=sniffer_pb2.OK)
191
192    def Stop(self, request, context):
193        """ Stop sniffing, and return the pcap bytes. """
194
195        self.logger.debug('call Stop')
196
197        # Validate and change the state
198        if not (self._state & CaptureState.THREAD):
199            return sniffer_pb2.StopResponse(status=sniffer_pb2.OPERATION_ERROR)
200        self._state = CaptureState.NONE
201
202        self._thread_alive.clear()
203        self._thread.join()
204        self._transport.close()
205        self._pcap.close()
206
207        self._tshark_proc.terminate()
208        self._file_sync_done.wait()
209        # `self._tshark_proc` becomes None after the next statement
210        self._tshark_proc.wait()
211
212        self._reset()
213
214        return sniffer_pb2.StopResponse(status=sniffer_pb2.OK)
215
216
217def serve(address_port, max_nodes_num):
218    # One worker is used for `Start`, `FilterNodes` and `Stop`
219    # The other worker is used for `TransferPcapng`, which will be kept running by the client in a background thread
220    server = grpc.server(futures.ThreadPoolExecutor(max_workers=2))
221    sniffer_pb2_grpc.add_SnifferServicer_to_server(SnifferServicer(max_nodes_num), server)
222    # add_secure_port requires a web domain
223    server.add_insecure_port(address_port)
224    logging.info('server starts on %s', address_port)
225    server.start()
226
227    def exit_handler(signum, context):
228        server.stop(1)
229
230    signal.signal(signal.SIGINT, exit_handler)
231    signal.signal(signal.SIGTERM, exit_handler)
232
233    server.wait_for_termination()
234
235
236def run_sniffer():
237    logging.basicConfig(level=logging.INFO)
238
239    parser = argparse.ArgumentParser()
240    parser.add_argument('--grpc-server',
241                        dest='grpc_server',
242                        type=str,
243                        required=True,
244                        help='the address of the sniffer server')
245    parser.add_argument('--max-nodes-num',
246                        dest='max_nodes_num',
247                        type=int,
248                        required=True,
249                        help='the maximum number of nodes')
250    args = parser.parse_args()
251
252    serve(args.grpc_server, args.max_nodes_num)
253
254
255if __name__ == '__main__':
256    run_sniffer()
257