1#!/usr/bin/env python3
2#
3#  Copyright (c) 2019, The OpenThread Authors.
4#  All rights reserved.
5#
6#  Redistribution and use in source and binary forms, with or without
7#  modification, are permitted provided that the following conditions are met:
8#  1. Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#  2. Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#  3. Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27#  POSSIBILITY OF SUCH DAMAGE.
28#
29import logging
30import sys
31from typing import Iterable, List, Union, Callable
32
33from pyshark.packet.layer import Layer as RawLayer
34from pyshark.packet.packet import Packet as RawPacket
35
36from pktverify import errors
37from pktverify.addrs import EthAddr
38from pktverify.coap import CoapLayer
39from pktverify.consts import VALID_LAYER_NAMES
40from pktverify.decorators import cached_property
41from pktverify.layers import Layer, ThreadMeshcopLayer, Icmpv6Layer, WpanLayer, ThreadNetworkDataLayer, DnsLayer
42from pktverify.utils import make_filter_func
43
44
45class Packet(object):
46
47    def __init__(self, packet: RawPacket):
48        self._packet = packet
49        self._strip_wpan_eth_wrapper(packet)
50
51    def __str__(self) -> str:
52        return str(self._packet)
53
54    def __repr__(self) -> str:
55        return repr(self._packet)
56
57    def __dir__(self) -> Iterable[str]:
58        return dir(self._packet)
59
60    def _strip_wpan_eth_wrapper(self, packet: RawPacket):
61        if not hasattr(packet, 'wpan'):
62            return
63
64        for layer in packet.layers:
65            if layer.layer_name == 'eth':
66                packet.layers.remove(layer)
67                eth_src = EthAddr(layer.get_field('eth.src'))
68                eth_dst = EthAddr(layer.get_field('eth.dst'))
69                logging.debug("stripping eth: src=%s, dst=%s", eth_src, eth_dst)
70                channel = eth_src[5]
71                self.wpan._add_field('wpan.channel', hex(channel))
72
73        return
74
75    @property
76    def layers(self) -> Iterable[RawLayer]:
77        for l in self._packet.layers:
78            if l.layer_name != 'data':
79                yield getattr(self, l.layer_name)
80
81    @property
82    def layer_names(self) -> List[str]:
83        return [l.layer_name for l in self._packet.layers if l.layer_name != 'data']
84
85    @cached_property
86    def wpan(self) -> WpanLayer:
87        return WpanLayer(self._packet, 'wpan')
88
89    @cached_property
90    def coap(self) -> CoapLayer:
91        return CoapLayer(self._packet, 'coap')
92
93    @cached_property
94    def icmpv6(self) -> Icmpv6Layer:
95        return Icmpv6Layer(self._packet, 'icmpv6')
96
97    @cached_property
98    def thread_meshcop(self) -> ThreadMeshcopLayer:
99        return ThreadMeshcopLayer(self._packet, 'thread_meshcop')
100
101    @cached_property
102    def thread_nwd(self) -> ThreadNetworkDataLayer:
103        return ThreadNetworkDataLayer(self._packet, 'thread_nwd')
104
105    @cached_property
106    def dns(self) -> DnsLayer:
107        return DnsLayer(self._packet, 'dns')
108
109    def __getattr__(self, layer_name: str) -> Layer:
110
111        real_layer_name = layer_name
112        if layer_name == 'lowpan':
113            real_layer_name = '6lowpan'
114
115        assert real_layer_name in VALID_LAYER_NAMES, '%s is not a valid layer name' % real_layer_name
116        _layer = getattr(self._packet, real_layer_name, None)
117        assert _layer is None or isinstance(_layer, RawLayer)
118
119        layer = Layer(self._packet, real_layer_name)
120        setattr(self, layer_name, layer)
121        if real_layer_name != layer_name:
122            setattr(self, real_layer_name, layer)
123
124        return layer
125
126    def verify(self, func: Union[str, Callable], **vars) -> bool:
127        print("\n>>> verifying packet:", file=sys.stderr, flush=False)
128        func = make_filter_func(func, **vars)
129        ok = func(self)
130        print("\t=> %s" % ok, file=sys.stderr)
131        return ok
132
133    def must_verify(self, func: Union[str, Callable], **vars):
134        if not self.verify(func, **vars):
135            self.debug_fields()
136            raise errors.VerifyFailed(self)
137
138        return self
139
140    def must_not_verify(self, func: Union[str, Callable], **vars):
141        if self.verify(func, **vars):
142            raise errors.VerifyFailed(self)
143
144        return self
145
146    @property
147    def sniff_timestamp(self) -> float:
148        return float(self._packet.sniff_timestamp)
149
150    def show(self):
151        self._packet.show()
152
153    def debug_fields(self):
154        for layer in self._packet.layers:
155            print("### Layer %s ###" % layer.layer_name)
156            for k, v in layer._all_fields.items():
157                print("\t\t%r = %r" % (k, v))
158