1#!/usr/bin/env python3
2#
3#  Copyright (c) 2019, The OpenThread Authors.
4#  All rights reserved.
5#
6#  Redistribution and use in source and binary forms, with or without
7#  modification, are permitted provided that the following conditions are met:
8#  1. Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#  2. Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#  3. Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27#  POSSIBILITY OF SUCH DAMAGE.
28#
29import logging
30import sys
31from typing import Iterable, List, Union, Callable
32
33from pyshark.packet.layer import Layer as RawLayer
34from pyshark.packet.packet import Packet as RawPacket
35
36from pktverify import errors
37from pktverify.addrs import EthAddr
38from pktverify.coap import CoapLayer
39from pktverify.consts import VALID_LAYER_NAMES
40from pktverify.decorators import cached_property
41from pktverify.layers import Layer, ThreadMeshcopLayer, Icmpv6Layer, WpanLayer, ThreadNetworkDataLayer
42from pktverify.utils import make_filter_func
43
44
45class Packet(object):
46
47    def __init__(self, packet: RawPacket):
48        self._packet = packet
49        self._strip_wpan_eth_wrapper(packet)
50
51    def __str__(self) -> str:
52        return str(self._packet)
53
54    def __repr__(self) -> str:
55        return repr(self._packet)
56
57    def __dir__(self) -> Iterable[str]:
58        return dir(self._packet)
59
60    def _strip_wpan_eth_wrapper(self, packet: RawPacket):
61        if not hasattr(packet, 'wpan'):
62            return
63
64        for layer in packet.layers:
65            if layer.layer_name == 'eth':
66                packet.layers.remove(layer)
67                eth_src = EthAddr(layer.get_field('eth.src'))
68                eth_dst = EthAddr(layer.get_field('eth.dst'))
69                logging.debug("stripping eth: src=%s, dst=%s", eth_src, eth_dst)
70                channel = eth_src[5]
71                self.wpan._add_field('wpan.channel', hex(channel))
72
73        return
74
75    @property
76    def layers(self) -> Iterable[RawLayer]:
77        for l in self._packet.layers:
78            if l.layer_name != 'data':
79                yield getattr(self, l.layer_name)
80
81    @property
82    def layer_names(self) -> List[str]:
83        return [l.layer_name for l in self._packet.layers if l.layer_name != 'data']
84
85    @cached_property
86    def wpan(self) -> WpanLayer:
87        return WpanLayer(self._packet, 'wpan')
88
89    @cached_property
90    def coap(self) -> CoapLayer:
91        return CoapLayer(self._packet, 'coap')
92
93    @cached_property
94    def icmpv6(self) -> Icmpv6Layer:
95        return Icmpv6Layer(self._packet, 'icmpv6')
96
97    @cached_property
98    def thread_meshcop(self) -> ThreadMeshcopLayer:
99        return ThreadMeshcopLayer(self._packet, 'thread_meshcop')
100
101    @cached_property
102    def thread_nwd(self) -> ThreadNetworkDataLayer:
103        return ThreadNetworkDataLayer(self._packet, 'thread_nwd')
104
105    def __getattr__(self, layer_name: str) -> Layer:
106
107        real_layer_name = layer_name
108        if layer_name == 'lowpan':
109            real_layer_name = '6lowpan'
110
111        assert real_layer_name in VALID_LAYER_NAMES, '%s is not a valid layer name' % real_layer_name
112        _layer = getattr(self._packet, real_layer_name, None)
113        assert _layer is None or isinstance(_layer, RawLayer)
114
115        layer = Layer(self._packet, real_layer_name)
116        setattr(self, layer_name, layer)
117        if real_layer_name != layer_name:
118            setattr(self, real_layer_name, layer)
119
120        return layer
121
122    def verify(self, func: Union[str, Callable], **vars) -> bool:
123        print("\n>>> verifying packet:", file=sys.stderr, flush=False)
124        func = make_filter_func(func, **vars)
125        ok = func(self)
126        print("\t=> %s" % ok, file=sys.stderr)
127        return ok
128
129    def must_verify(self, func: Union[str, Callable], **vars):
130        if not self.verify(func, **vars):
131            self.debug_fields()
132            raise errors.VerifyFailed(self)
133
134        return self
135
136    def must_not_verify(self, func: Union[str, Callable], **vars):
137        if self.verify(func, **vars):
138            raise errors.VerifyFailed(self)
139
140        return self
141
142    @property
143    def sniff_timestamp(self) -> float:
144        return float(self._packet.sniff_timestamp)
145
146    def show(self):
147        self._packet.show()
148
149    def debug_fields(self):
150        for layer in self._packet.layers:
151            print("### Layer %s ###" % layer.layer_name)
152            for k, v in layer._all_fields.items():
153                print("\t\t%r = %r" % (k, v))
154