1#!/usr/bin/env python3
2#
3#  Copyright (c) 2019, The OpenThread Authors.
4#  All rights reserved.
5#
6#  Redistribution and use in source and binary forms, with or without
7#  modification, are permitted provided that the following conditions are met:
8#  1. Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#  2. Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#  3. Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27#  POSSIBILITY OF SUCH DAMAGE.
28#
29# This is a test script for checking layer fields against a given test.pcap.
30#
31
32import logging
33import unittest
34
35from pktverify import layer_fields
36from pktverify.addrs import EthAddr, ExtAddr, Ipv6Addr
37from pktverify.bytes import Bytes
38from pktverify.consts import REAL_LAYER_NAMES, VALID_LAYER_NAMES
39from pktverify.layer_fields_container import LayerFieldsContainer
40from pktverify.null_field import nullField
41from pktverify.packet import Packet
42from pktverify.pcap_reader import PcapReader
43
44
45class TestLayerFields(unittest.TestCase):
46
47    def test(self):
48        logging.basicConfig(level=logging.DEBUG)
49        pkts = PcapReader.read("test.pcap")
50
51        for i, p in enumerate(pkts):
52
53            logging.info("check packet #%d", i + 1)
54
55            for layer_name in VALID_LAYER_NAMES:
56                if layer_name == 'lowpan':  # we already checked 6lowpan
57                    continue
58
59                layer = getattr(p, layer_name)
60                if hasattr(p._packet, layer_name):
61                    if layer_name in REAL_LAYER_NAMES:
62                        self.assertTrue(layer)
63                    checker = getattr(self, '_test_' + layer_name, None)
64                    if checker is None:
65                        continue
66
67                    try:
68                        checker(p)
69                    except Exception:
70                        layer.show()
71                        raise
72                else:
73                    if layer_name in REAL_LAYER_NAMES:
74                        self.assertFalse(layer)
75
76            for layer in p._packet.layers:
77                self._check_missing_fields(p, layer.layer_name, layer)
78
79    def _test_coap(self, p):
80        coap = p.coap
81        self.assertIsInstance(coap.version, int)
82        self.assertIsInstance(coap.type, int)
83        self.assertIsInstance(coap.token_len, int)
84        self.assertIsInstance(coap.code, int)
85        self.assertIsInstance(coap.mid, int)
86        self.assertIsInstanceOrNull(coap.token, int)
87        self.assertIsInstanceOrNull(coap.opt.uri_path_recon, str)
88        self.assertIsInstanceOrNull(coap.payload, bytearray)
89
90        print(p.coap.tlv.type, p.coap.tlv)
91        assert isinstance(coap.tlv, LayerFieldsContainer), repr(coap.tlv)
92        self.assertIsInstanceOrNull(coap.tlv.type, list)
93
94    def _test_mle(self, p):
95        mle = p.mle
96
97        self._must_have_wpan_aux_sec(p)
98
99        self.assertIsInstance(mle.cmd, int)
100        self.assertIsInstanceOrNull(mle.tlv.mode.receiver_on_idle, int)
101        self.assertIsInstanceOrNull(mle.tlv.mode.reserved1, int)
102        self.assertIsInstanceOrNull(mle.tlv.mode.reserved2, int)
103        self.assertIsInstanceOrNull(mle.tlv.mode.device_type_bit, int)
104        self.assertIsInstanceOrNull(mle.tlv.mode.network_data, int)
105        self.assertIsInstanceOrNull(mle.tlv.challenge, Bytes)
106        self.assertIsInstanceOrNull(mle.tlv.scan_mask.r, int)
107        self.assertIsInstanceOrNull(mle.tlv.scan_mask.e, int)
108        self.assertIsInstanceOrNull(mle.tlv.version, int)
109        self.assertIsInstanceOrNull(mle.tlv.source_addr, int)
110        self.assertIsInstanceOrNull(mle.tlv.active_tstamp, int)
111        self.assertIsInstanceOrNull(mle.tlv.leader_data.partition_id, int)
112        self.assertIsInstanceOrNull(mle.tlv.leader_data.weighting, int)
113        self.assertIsInstanceOrNull(mle.tlv.leader_data.data_version, int)
114        self.assertIsInstanceOrNull(mle.tlv.leader_data.stable_data_version, int)
115        self.assertIsInstanceOrNull(mle.tlv.leader_data.router_id, int)
116
117    def _test_wpan(self, p):
118        wpan = p.wpan
119        self.assertIsInstance(wpan.fcf, int)
120        self.assertIsInstance(wpan.fcs, int)
121        self.assertIsInstance(wpan.security, int)
122        self.assertIsInstance(wpan.pending, int)
123        self.assertIsInstance(wpan.ack_request, int)
124        self.assertIsInstance(wpan.pan_id_compression, int)
125        self.assertIsInstance(wpan.seqno_suppression, int)
126        self.assertIsInstance(wpan.ie_present, int)
127        self.assertIsInstance(wpan.dst_addr_mode, int)
128        self.assertIsInstance(wpan.version, int)
129        self.assertIsInstance(wpan.src_addr_mode, int)
130
131        self.assertIsInstance(wpan.seq_no, int)
132
133        if not wpan.is_ack:
134            self.assertIsInstanceOrNull(wpan.dst_pan, int)
135            self.assertIsInstanceOrNull(wpan.dst16, int)
136            self.assertIsInstanceOrNull(wpan.src16, int)
137            self.assertIsInstanceOrNull(wpan.src64, ExtAddr)
138            self.assertIsInstanceOrNull(wpan.dst64, ExtAddr)
139
140        if wpan.aux_sec:
141            self._must_have_wpan_aux_sec(p)
142
143    def _must_have_wpan_aux_sec(self, p):
144        wpan = p.wpan
145        self.assertIsInstanceOrNull(wpan.aux_sec.sec_suite, int)
146        self.assertIsInstanceOrNull(wpan.aux_sec.security_control_field, int)
147        self.assertIsInstanceOrNull(wpan.aux_sec.sec_level, int)
148        self.assertIsInstanceOrNull(wpan.aux_sec.key_id_mode, int)
149        self.assertIsInstanceOrNull(wpan.aux_sec.frame_counter_suppression, int)
150        self.assertIsInstanceOrNull(wpan.aux_sec.asn_in_nonce, int)
151        self.assertIsInstanceOrNull(wpan.aux_sec.reserved, int)
152        self.assertIsInstanceOrNull(wpan.aux_sec.frame_counter, int)
153        self.assertIsInstanceOrNull(wpan.aux_sec.key_source, int)
154        self.assertIsInstanceOrNull(wpan.aux_sec.key_index, int)
155
156    def assertIsInstanceOrNull(self, field, type):
157        if field is not nullField:
158            self.assertIsInstance(field, type)
159
160    def _test_thread_bl(self, p):
161        thread_bl = p.thread_bl
162        self.assertTrue(thread_bl)
163
164        self.assertIsInstanceOrNull(thread_bl.tlv.target_eid, Ipv6Addr)
165        self.assertIsInstanceOrNull(thread_bl.tlv.ml_eid, ExtAddr)
166        self.assertIsInstanceOrNull(thread_bl.tlv.last_transaction_time, int)
167        self.assertIsInstanceOrNull(p.thread_meshcop.tlv.net_name, list)
168
169    def _test_thread_meshcop(self, p: Packet):
170        thread_meshcop = p.thread_meshcop
171
172        for layer in sorted(p.layers, key=lambda l: l.layer_name):
173            if 'thread_meshcop.tlv.commissioner_sess_id' in layer._layer._all_fields:
174                self.assertIsInstance(thread_meshcop.tlv.commissioner_sess_id, int)
175
176            if 'thread_meshcop.tlv.net_name' in layer._layer._all_fields:
177                self.assertIsInstance(thread_meshcop.tlv.net_name, list)
178
179            if 'thread_meshcop.tlv.channel_page' in layer._layer._all_fields:
180                self.assertIsInstance(thread_meshcop.tlv.channel_page, int)
181
182            if 'thread_meshcop.tlv.channel' in layer._layer._all_fields:
183                self.assertIsInstance(thread_meshcop.tlv.channel, list)
184
185            if 'thread_meshcop.tlv.chan_mask_page' in layer._layer._all_fields:
186                self.assertIsInstance(thread_meshcop.tlv.chan_mask_page, int)
187            if 'thread_meshcop.tlv.chan_mask_len' in layer._layer._all_fields:
188                self.assertIsInstance(thread_meshcop.tlv.chan_mask_len, int)
189            if 'thread_meshcop.tlv.chan_mask_mask' in layer._layer._all_fields:
190                self.assertIsInstance(thread_meshcop.tlv.chan_mask_mask, int)
191
192            if 'thread_meshcop.tlv.panid' in layer._layer._all_fields:
193                self.assertIsInstance(thread_meshcop.tlv.panid, int)
194
195            if 'thread_meshcop.tlv.ml_prefix' in layer._layer._all_fields:
196                self.assertIsInstance(thread_meshcop.tlv.ml_prefix, Bytes)
197
198            if 'thread_meshcop.tlv.master_key' in layer._layer._all_fields:
199                self.assertIsInstance(thread_meshcop.tlv.master_key, Bytes)
200
201            if 'thread_meshcop.tlv.pskc' in layer._layer._all_fields:
202                self.assertIsInstance(thread_meshcop.tlv.pskc, Bytes)
203
204            if 'thread_meshcop.tlv.sec_policy_rot' in layer._layer._all_fields:
205                self.assertIsInstance(thread_meshcop.tlv.sec_policy_rot, int)
206
207            if 'thread_meshcop.tlv.sec_policy_o' in layer._layer._all_fields:
208                self.assertIsInstance(thread_meshcop.tlv.sec_policy_o, int)
209
210            if 'thread_meshcop.tlv.sec_policy_n' in layer._layer._all_fields:
211                self.assertIsInstance(thread_meshcop.tlv.sec_policy_n, int)
212
213            if 'thread_meshcop.tlv.sec_policy_r' in layer._layer._all_fields:
214                self.assertIsInstance(thread_meshcop.tlv.sec_policy_r, int)
215
216            if 'thread_meshcop.tlv.sec_policy_c' in layer._layer._all_fields:
217                self.assertIsInstance(thread_meshcop.tlv.sec_policy_c, int)
218
219            if 'thread_meshcop.tlv.sec_policy_b' in layer._layer._all_fields:
220                self.assertIsInstance(thread_meshcop.tlv.sec_policy_b, int)
221
222            if 'thread_meshcop.tlv.pan_id' in layer._layer._all_fields:
223                self.assertIsInstance(thread_meshcop.tlv.pan_id, list)
224
225            if 'thread_meshcop.tlv.xpan_id' in layer._layer._all_fields:
226                self.assertIsInstance(thread_meshcop.tlv.xpan_id, Bytes)
227
228            for field in layer._layer._all_fields:
229                if field.startswith('thread_meshcop') and not layer_fields.is_layer_field(field) and field not in (
230                        'thread_meshcop.tlv', 'thread_meshcop.tlv.type', 'thread_meshcop.tlv.len8'):
231                    print('found %s = %s in layer %s' % (
232                        field,
233                        layer._layer.get_field(field),
234                        layer.layer_name,
235                    ))
236
237    def _test_icmpv6(self, p):
238        icmpv6 = p.icmpv6
239        self.assertTrue(p.icmpv6)
240
241        self.assertIsInstance(icmpv6.type, int)
242        self.assertIsInstance(icmpv6.code, int)
243        self.assertIsInstance(icmpv6.checksum, int)
244        self.assertIsInstanceOrNull(icmpv6.reserved, int)
245        self.assertIsInstanceOrNull(icmpv6.nd.na.flag.s, int)
246        self.assertIsInstanceOrNull(icmpv6.nd.na.flag.o, int)
247        self.assertIsInstanceOrNull(icmpv6.nd.na.flag.r, int)
248        self.assertIsInstanceOrNull(icmpv6.nd.na.flag.rsv, int)
249        self.assertIsInstanceOrNull(icmpv6.nd.ra.cur_hop_limit, int)
250        self.assertIsInstanceOrNull(icmpv6.mldr.nb_mcast_records, int)
251        self.assertIsInstanceOrNull(icmpv6.nd.ns.target_address, Ipv6Addr)
252        self.assertIsInstanceOrNull(icmpv6.mldr.mar.multicast_address, list)
253
254    def get_field(self, p: Packet, f):
255        secs = f.split('.')
256        assert len(secs) >= 2
257        v = p
258        for sec in secs:
259            v = getattr(v, sec)
260
261        return v
262
263    def _test_6lowpan(self, p):
264        lowpan = p.lowpan
265        assert lowpan is getattr(p, '6lowpan')
266        self.assertIsInstanceOrNull(lowpan.src, Ipv6Addr)
267        self.assertIsInstanceOrNull(lowpan.dst, Ipv6Addr)
268        self.assertIsInstanceOrNull(lowpan.udp.src, int)
269        self.assertIsInstanceOrNull(lowpan.udp.dst, int)
270        self.assertIsInstanceOrNull(lowpan.udp.checksum, int)
271        self.assertIsInstanceOrNull(lowpan.frag.size, int)
272        self.assertIsInstanceOrNull(lowpan.frag.tag, int)
273        self.assertIsInstanceOrNull(lowpan.frag.offset, int)
274        self.assertIsInstanceOrNull(lowpan.nhc.pattern, list)
275        self.assertIsInstanceOrNull(lowpan.nhc.udp.checksum, int)
276        self.assertIsInstanceOrNull(lowpan.nhc.udp.ports, int)
277
278        self.assertIsInstanceOrNull(lowpan.pattern, list)
279        self.assertIsInstanceOrNull(lowpan.iphc.tf, int)
280        self.assertIsInstanceOrNull(lowpan.iphc.nh, int)
281        self.assertIsInstanceOrNull(lowpan.iphc.hlim, int)
282        self.assertIsInstanceOrNull(lowpan.iphc.cid, int)
283        self.assertIsInstanceOrNull(lowpan.iphc.sac, int)
284        self.assertIsInstanceOrNull(lowpan.iphc.sam, int)
285        self.assertIsInstanceOrNull(lowpan.iphc.m, int)
286        self.assertIsInstanceOrNull(lowpan.iphc.dac, int)
287        self.assertIsInstanceOrNull(lowpan.iphc.dam, int)
288        self.assertIsInstanceOrNull(lowpan.iphc.sctx.prefix, Bytes)
289        self.assertIsInstanceOrNull(lowpan.iphc.dctx.prefix, Bytes)
290
291    def _test_ip(self, p):
292        pass
293
294    def _test_ipv6(self, p):
295        pass
296
297    def _test_udp(self, p):
298        pass
299
300    def _test_eth(self, p):
301        eth = p.eth
302        self.assertIsInstance(eth.src, EthAddr)
303        self.assertIsInstance(eth.dst, EthAddr)
304        self.assertIsInstance(eth.type, int)
305
306    def _check_missing_fields(self, p, layer_name, _layer):
307        for f in sorted(_layer._all_fields.keys(), reverse=True):
308            if f.startswith('_ws') or f.startswith('data'):
309                continue
310
311            logging.info('_check_missing_fields in layer %s: %s = %r' % (layer_name, f, _layer._all_fields[f]))
312            if f in {
313                    '', 'icmpv6.checksum.status', 'ip.ttl.lncb', 'wpan.aux_sec.key_source.bytes', 'wpan.src64.origin'
314            }:
315                # TODO: handle these fields
316                continue
317
318            v = _layer._all_fields[f]
319
320            if layer_fields.is_layer_field_container(f):
321                continue
322
323            try:
324                rv = self.get_field(p, f)
325                self.assertIsNot(rv, nullField)
326
327                parser = layer_fields._LAYER_FIELDS[f]
328                if isinstance(parser, layer_fields._first):
329                    parser = parser._sub_parse
330
331                if parser in (layer_fields._raw_hex, layer_fields._hex, layer_fields._raw_hex_rev, layer_fields._dec,
332                              layer_fields._auto):
333                    self.assertIsInstance(rv, int)
334                elif isinstance(parser, layer_fields._list):
335                    self.assertIsInstance(rv, list)
336                elif parser is layer_fields._ipv6_addr:
337                    self.assertIsInstance(rv, Ipv6Addr)
338                elif parser is layer_fields._eth_addr:
339                    self.assertIsInstance(rv, EthAddr)
340                elif parser is layer_fields._ext_addr:
341                    self.assertIsInstance(rv, ExtAddr)
342                elif parser is layer_fields._str:
343                    self.assertIsInstance(rv, str)
344                elif parser is layer_fields._bytes:
345                    self.assertIsInstance(rv, Bytes)
346                elif parser is layer_fields._payload:
347                    self.assertIsInstance(rv, bytearray)
348                elif parser is layer_fields._float:
349                    self.assertIsInstance(rv, float)
350                else:
351                    raise NotImplementedError(parser)
352            except Exception:
353                logging.info('checking [%s] %s=%r, %r, %r (%d)' %
354                             (layer_name, f, v, v.get_default_value(), v.raw_value, len(v.fields)))
355                raise
356