1#!/usr/bin/env python3
2#
3#  Copyright (c) 2019, The OpenThread Authors.
4#  All rights reserved.
5#
6#  Redistribution and use in source and binary forms, with or without
7#  modification, are permitted provided that the following conditions are met:
8#  1. Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#  2. Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#  3. Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27#  POSSIBILITY OF SUCH DAMAGE.
28#
29import logging
30import os
31import sys
32from typing import Callable, Union
33
34from pktverify.addrs import EthAddr, ExtAddr, Ipv6Addr
35from pktverify.bytes import Bytes
36from pktverify.null_field import nullField
37
38
39def make_filter_func(func: Union[str, Callable], **vars) -> Callable:
40    """
41    Convert the filter to a callable function if it's a string.
42
43    :param func: The filter string or callable.
44    :param vars: The variables.
45    :return: The filter callable.
46    """
47    if isinstance(func, str):
48        # if func is a string, compile it to a function
49        func = func.format_map({k: repr(v) for k, v in vars.items()}).strip()
50        print("\t%s" % func, file=sys.stderr)
51        code = compile('(\n' + func + '\n)', func, "eval")
52
53        def func(p):
54            return eval(
55                code, None, {
56                    'p': p,
57                    'coap': p.coap,
58                    'wpan': p.wpan,
59                    'mle': p.mle,
60                    'ipv6': p.ipv6,
61                    'lowpan': p.lowpan,
62                    'eth': p.eth,
63                    'icmpv6': p.icmpv6,
64                    'udp': p.udp,
65                    'thread_bl': p.thread_bl,
66                    'thread_meshcop': p.thread_meshcop,
67                    'Bytes': Bytes,
68                    'ExtAddr': ExtAddr,
69                    'Ipv6Addr': Ipv6Addr,
70                    'EthAddr': EthAddr,
71                    'thread_nm': p.thread_nm,
72                    'thread_nwd': p.thread_nwd,
73                    'thread_address': p.thread_address,
74                    'thread_bcn': p.thread_bcn,
75                    'null': nullField,
76                })
77    else:
78        assert not vars, 'can not provide vars for non-str filter: %r %r' % (func, vars)
79
80    assert callable(func)
81    return func
82
83
84def _setup_wireshark_disabled_protos():
85    home = os.environ['HOME']
86    wireshark_config_dir = os.path.join(home, '.config', 'wireshark')
87    os.makedirs(wireshark_config_dir, exist_ok=True)
88    disabled_protos_path = os.path.join(wireshark_config_dir, 'disabled_protos')
89    # read current disabled protos
90    try:
91        with open(disabled_protos_path, 'rt') as fd:
92            disabled_protos = set(l.strip() for l in fd if l.strip() != '')
93    except FileNotFoundError:
94        disabled_protos = set()
95
96    old_disabled_protos_num = len(disabled_protos)
97    disabled_protos.add('lwm')
98    disabled_protos.add('prp')
99    disabled_protos.add('stcsig')
100    disabled_protos.add('transum')
101    disabled_protos.add('zbee_nwk')
102    disabled_protos.add('zbee_nwk_gp')
103
104    if len(disabled_protos) > old_disabled_protos_num:
105        logging.info(f"set disabled_protos = {' '.join(disabled_protos)}")
106        with open(disabled_protos_path, 'wt') as fd:
107            fd.write('\n'.join(sorted(disabled_protos)))
108            fd.write('\n')
109
110
111def get_wireshark_dir() -> str:
112    """
113    :return: The path to wireshark directory.
114    """
115    dir = '/tmp/thread-wireshark'
116    _setup_wireshark_disabled_protos()
117    return dir
118
119
120def which_tshark() -> str:
121    """
122    :return: The path to `tshark` executable.
123    """
124    return os.path.join(get_wireshark_dir(), 'tshark')
125
126
127def which_dumpcap() -> str:
128    """
129    :return: The path to `dumpcap` executable.
130    """
131    return os.path.join(get_wireshark_dir(), 'dumpcap')
132
133
134def which_mergecap() -> str:
135    """
136    :return: The path to `mergecap` executable.
137    """
138    return os.path.join(get_wireshark_dir(), 'mergecap')
139
140
141def colon_hex(hexstr, interval) -> str:
142    """ Convert hexstr to colon seperated string every interval
143
144    :param hexstr: The hex string to convert.
145    :param interval: The interval number.
146    :return: The colon seperated string.
147    """
148    assert len(hexstr) % interval == 0
149    return ':'.join(hexstr[i:i + interval] for i in range(0, len(hexstr), interval))
150
151
152def is_sublist(lst1: list, lst2: list) -> bool:
153    """ Test whether lst1 is a slice of lst2
154
155    :param lst1: The list to judge if it is a sublist of lst2.
156    :param lst2: The list to judge if contains lst1.
157    :return: Whether lst1 is a slice of lst2.
158    """
159    return lst1 in [lst2[i:len(lst1) + i] for i in range(len(lst1))]
160