1#!/usr/bin/env python3 2# 3# Copyright (c) 2019, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28# 29# This is a test script for checking layer fields against a given test.pcap. 30# 31 32import logging 33import unittest 34 35from pktverify import layer_fields 36from pktverify.addrs import EthAddr, ExtAddr, Ipv6Addr 37from pktverify.bytes import Bytes 38from pktverify.consts import REAL_LAYER_NAMES, VALID_LAYER_NAMES 39from pktverify.layer_fields_container import LayerFieldsContainer 40from pktverify.null_field import nullField 41from pktverify.packet import Packet 42from pktverify.pcap_reader import PcapReader 43 44 45class TestLayerFields(unittest.TestCase): 46 47 def test(self): 48 logging.basicConfig(level=logging.DEBUG) 49 pkts = PcapReader.read("test.pcap") 50 51 for i, p in enumerate(pkts): 52 53 logging.info("check packet #%d", i + 1) 54 55 for layer_name in VALID_LAYER_NAMES: 56 if layer_name == 'lowpan': # we already checked 6lowpan 57 continue 58 59 layer = getattr(p, layer_name) 60 if hasattr(p._packet, layer_name): 61 if layer_name in REAL_LAYER_NAMES: 62 self.assertTrue(layer) 63 checker = getattr(self, '_test_' + layer_name, None) 64 if checker is None: 65 continue 66 67 try: 68 checker(p) 69 except Exception: 70 layer.show() 71 raise 72 else: 73 if layer_name in REAL_LAYER_NAMES: 74 self.assertFalse(layer) 75 76 for layer in p._packet.layers: 77 self._check_missing_fields(p, layer.layer_name, layer) 78 79 def _test_coap(self, p): 80 coap = p.coap 81 self.assertIsInstance(coap.version, int) 82 self.assertIsInstance(coap.type, int) 83 self.assertIsInstance(coap.token_len, int) 84 self.assertIsInstance(coap.code, int) 85 self.assertIsInstance(coap.mid, int) 86 self.assertIsInstanceOrNull(coap.token, int) 87 self.assertIsInstanceOrNull(coap.opt.uri_path_recon, str) 88 self.assertIsInstanceOrNull(coap.payload, bytearray) 89 90 print(p.coap.tlv.type, p.coap.tlv) 91 assert isinstance(coap.tlv, LayerFieldsContainer), repr(coap.tlv) 92 self.assertIsInstanceOrNull(coap.tlv.type, list) 93 94 def _test_mle(self, p): 95 mle = p.mle 96 97 self._must_have_wpan_aux_sec(p) 98 99 self.assertIsInstance(mle.cmd, int) 100 self.assertIsInstanceOrNull(mle.tlv.mode.receiver_on_idle, int) 101 self.assertIsInstanceOrNull(mle.tlv.mode.reserved1, int) 102 self.assertIsInstanceOrNull(mle.tlv.mode.reserved2, int) 103 self.assertIsInstanceOrNull(mle.tlv.mode.device_type_bit, int) 104 self.assertIsInstanceOrNull(mle.tlv.mode.network_data, int) 105 self.assertIsInstanceOrNull(mle.tlv.challenge, Bytes) 106 self.assertIsInstanceOrNull(mle.tlv.scan_mask.r, int) 107 self.assertIsInstanceOrNull(mle.tlv.scan_mask.e, int) 108 self.assertIsInstanceOrNull(mle.tlv.version, int) 109 self.assertIsInstanceOrNull(mle.tlv.source_addr, int) 110 self.assertIsInstanceOrNull(mle.tlv.active_tstamp, int) 111 self.assertIsInstanceOrNull(mle.tlv.leader_data.partition_id, int) 112 self.assertIsInstanceOrNull(mle.tlv.leader_data.weighting, int) 113 self.assertIsInstanceOrNull(mle.tlv.leader_data.data_version, int) 114 self.assertIsInstanceOrNull(mle.tlv.leader_data.stable_data_version, int) 115 self.assertIsInstanceOrNull(mle.tlv.leader_data.router_id, int) 116 117 def _test_wpan(self, p): 118 wpan = p.wpan 119 self.assertIsInstance(wpan.fcf, int) 120 self.assertIsInstance(wpan.fcs, int) 121 self.assertIsInstance(wpan.security, int) 122 self.assertIsInstance(wpan.pending, int) 123 self.assertIsInstance(wpan.ack_request, int) 124 self.assertIsInstance(wpan.pan_id_compression, int) 125 self.assertIsInstance(wpan.seqno_suppression, int) 126 self.assertIsInstance(wpan.ie_present, int) 127 self.assertIsInstance(wpan.dst_addr_mode, int) 128 self.assertIsInstance(wpan.version, int) 129 self.assertIsInstance(wpan.src_addr_mode, int) 130 131 self.assertIsInstance(wpan.seq_no, int) 132 133 if not wpan.is_ack: 134 self.assertIsInstanceOrNull(wpan.dst_pan, int) 135 self.assertIsInstanceOrNull(wpan.dst16, int) 136 self.assertIsInstanceOrNull(wpan.src16, int) 137 self.assertIsInstanceOrNull(wpan.src64, ExtAddr) 138 self.assertIsInstanceOrNull(wpan.dst64, ExtAddr) 139 140 if wpan.aux_sec: 141 self._must_have_wpan_aux_sec(p) 142 143 def _must_have_wpan_aux_sec(self, p): 144 wpan = p.wpan 145 self.assertIsInstanceOrNull(wpan.aux_sec.sec_suite, int) 146 self.assertIsInstanceOrNull(wpan.aux_sec.security_control_field, int) 147 self.assertIsInstanceOrNull(wpan.aux_sec.sec_level, int) 148 self.assertIsInstanceOrNull(wpan.aux_sec.key_id_mode, int) 149 self.assertIsInstanceOrNull(wpan.aux_sec.frame_counter_suppression, int) 150 self.assertIsInstanceOrNull(wpan.aux_sec.asn_in_nonce, int) 151 self.assertIsInstanceOrNull(wpan.aux_sec.reserved, int) 152 self.assertIsInstanceOrNull(wpan.aux_sec.frame_counter, int) 153 self.assertIsInstanceOrNull(wpan.aux_sec.key_source, int) 154 self.assertIsInstanceOrNull(wpan.aux_sec.key_index, int) 155 156 def assertIsInstanceOrNull(self, field, type): 157 if field is not nullField: 158 self.assertIsInstance(field, type) 159 160 def _test_thread_bl(self, p): 161 thread_bl = p.thread_bl 162 self.assertTrue(thread_bl) 163 164 self.assertIsInstanceOrNull(thread_bl.tlv.target_eid, Ipv6Addr) 165 self.assertIsInstanceOrNull(thread_bl.tlv.ml_eid, ExtAddr) 166 self.assertIsInstanceOrNull(thread_bl.tlv.last_transaction_time, int) 167 self.assertIsInstanceOrNull(p.thread_meshcop.tlv.net_name, list) 168 169 def _test_thread_meshcop(self, p: Packet): 170 thread_meshcop = p.thread_meshcop 171 172 for layer in sorted(p.layers, key=lambda l: l.layer_name): 173 if 'thread_meshcop.tlv.commissioner_sess_id' in layer._layer._all_fields: 174 self.assertIsInstance(thread_meshcop.tlv.commissioner_sess_id, int) 175 176 if 'thread_meshcop.tlv.net_name' in layer._layer._all_fields: 177 self.assertIsInstance(thread_meshcop.tlv.net_name, list) 178 179 if 'thread_meshcop.tlv.channel_page' in layer._layer._all_fields: 180 self.assertIsInstance(thread_meshcop.tlv.channel_page, int) 181 182 if 'thread_meshcop.tlv.channel' in layer._layer._all_fields: 183 self.assertIsInstance(thread_meshcop.tlv.channel, list) 184 185 if 'thread_meshcop.tlv.chan_mask_page' in layer._layer._all_fields: 186 self.assertIsInstance(thread_meshcop.tlv.chan_mask_page, int) 187 if 'thread_meshcop.tlv.chan_mask_len' in layer._layer._all_fields: 188 self.assertIsInstance(thread_meshcop.tlv.chan_mask_len, int) 189 if 'thread_meshcop.tlv.chan_mask_mask' in layer._layer._all_fields: 190 self.assertIsInstance(thread_meshcop.tlv.chan_mask_mask, int) 191 192 if 'thread_meshcop.tlv.panid' in layer._layer._all_fields: 193 self.assertIsInstance(thread_meshcop.tlv.panid, int) 194 195 if 'thread_meshcop.tlv.ml_prefix' in layer._layer._all_fields: 196 self.assertIsInstance(thread_meshcop.tlv.ml_prefix, Bytes) 197 198 if 'thread_meshcop.tlv.master_key' in layer._layer._all_fields: 199 self.assertIsInstance(thread_meshcop.tlv.master_key, Bytes) 200 201 if 'thread_meshcop.tlv.pskc' in layer._layer._all_fields: 202 self.assertIsInstance(thread_meshcop.tlv.pskc, Bytes) 203 204 if 'thread_meshcop.tlv.sec_policy_rot' in layer._layer._all_fields: 205 self.assertIsInstance(thread_meshcop.tlv.sec_policy_rot, int) 206 207 if 'thread_meshcop.tlv.sec_policy_o' in layer._layer._all_fields: 208 self.assertIsInstance(thread_meshcop.tlv.sec_policy_o, int) 209 210 if 'thread_meshcop.tlv.sec_policy_n' in layer._layer._all_fields: 211 self.assertIsInstance(thread_meshcop.tlv.sec_policy_n, int) 212 213 if 'thread_meshcop.tlv.sec_policy_r' in layer._layer._all_fields: 214 self.assertIsInstance(thread_meshcop.tlv.sec_policy_r, int) 215 216 if 'thread_meshcop.tlv.sec_policy_c' in layer._layer._all_fields: 217 self.assertIsInstance(thread_meshcop.tlv.sec_policy_c, int) 218 219 if 'thread_meshcop.tlv.sec_policy_b' in layer._layer._all_fields: 220 self.assertIsInstance(thread_meshcop.tlv.sec_policy_b, int) 221 222 if 'thread_meshcop.tlv.pan_id' in layer._layer._all_fields: 223 self.assertIsInstance(thread_meshcop.tlv.pan_id, list) 224 225 if 'thread_meshcop.tlv.xpan_id' in layer._layer._all_fields: 226 self.assertIsInstance(thread_meshcop.tlv.xpan_id, Bytes) 227 228 for field in layer._layer._all_fields: 229 if field.startswith('thread_meshcop') and not layer_fields.is_layer_field(field) and field not in ( 230 'thread_meshcop.tlv', 'thread_meshcop.tlv.type', 'thread_meshcop.tlv.len8'): 231 print('found %s = %s in layer %s' % ( 232 field, 233 layer._layer.get_field(field), 234 layer.layer_name, 235 )) 236 237 def _test_icmpv6(self, p): 238 icmpv6 = p.icmpv6 239 self.assertTrue(p.icmpv6) 240 241 self.assertIsInstance(icmpv6.type, int) 242 self.assertIsInstance(icmpv6.code, int) 243 self.assertIsInstance(icmpv6.checksum, int) 244 self.assertIsInstanceOrNull(icmpv6.reserved, int) 245 self.assertIsInstanceOrNull(icmpv6.nd.na.flag.s, int) 246 self.assertIsInstanceOrNull(icmpv6.nd.na.flag.o, int) 247 self.assertIsInstanceOrNull(icmpv6.nd.na.flag.r, int) 248 self.assertIsInstanceOrNull(icmpv6.nd.na.flag.rsv, int) 249 self.assertIsInstanceOrNull(icmpv6.nd.ra.cur_hop_limit, int) 250 self.assertIsInstanceOrNull(icmpv6.mldr.nb_mcast_records, int) 251 self.assertIsInstanceOrNull(icmpv6.nd.ns.target_address, Ipv6Addr) 252 self.assertIsInstanceOrNull(icmpv6.mldr.mar.multicast_address, list) 253 254 def get_field(self, p: Packet, f): 255 secs = f.split('.') 256 assert len(secs) >= 2 257 v = p 258 for sec in secs: 259 v = getattr(v, sec) 260 261 return v 262 263 def _test_6lowpan(self, p): 264 lowpan = p.lowpan 265 assert lowpan is getattr(p, '6lowpan') 266 self.assertIsInstanceOrNull(lowpan.src, Ipv6Addr) 267 self.assertIsInstanceOrNull(lowpan.dst, Ipv6Addr) 268 self.assertIsInstanceOrNull(lowpan.udp.src, int) 269 self.assertIsInstanceOrNull(lowpan.udp.dst, int) 270 self.assertIsInstanceOrNull(lowpan.udp.checksum, int) 271 self.assertIsInstanceOrNull(lowpan.frag.size, int) 272 self.assertIsInstanceOrNull(lowpan.frag.tag, int) 273 self.assertIsInstanceOrNull(lowpan.frag.offset, int) 274 self.assertIsInstanceOrNull(lowpan.nhc.pattern, list) 275 self.assertIsInstanceOrNull(lowpan.nhc.udp.checksum, int) 276 self.assertIsInstanceOrNull(lowpan.nhc.udp.ports, int) 277 278 self.assertIsInstanceOrNull(lowpan.pattern, list) 279 self.assertIsInstanceOrNull(lowpan.iphc.tf, int) 280 self.assertIsInstanceOrNull(lowpan.iphc.nh, int) 281 self.assertIsInstanceOrNull(lowpan.iphc.hlim, int) 282 self.assertIsInstanceOrNull(lowpan.iphc.cid, int) 283 self.assertIsInstanceOrNull(lowpan.iphc.sac, int) 284 self.assertIsInstanceOrNull(lowpan.iphc.sam, int) 285 self.assertIsInstanceOrNull(lowpan.iphc.m, int) 286 self.assertIsInstanceOrNull(lowpan.iphc.dac, int) 287 self.assertIsInstanceOrNull(lowpan.iphc.dam, int) 288 self.assertIsInstanceOrNull(lowpan.iphc.sctx.prefix, Bytes) 289 self.assertIsInstanceOrNull(lowpan.iphc.dctx.prefix, Bytes) 290 291 def _test_ip(self, p): 292 pass 293 294 def _test_ipv6(self, p): 295 pass 296 297 def _test_udp(self, p): 298 pass 299 300 def _test_eth(self, p): 301 eth = p.eth 302 self.assertIsInstance(eth.src, EthAddr) 303 self.assertIsInstance(eth.dst, EthAddr) 304 self.assertIsInstance(eth.type, int) 305 306 def _check_missing_fields(self, p, layer_name, _layer): 307 for f in sorted(_layer._all_fields.keys(), reverse=True): 308 if f.startswith('_ws') or f.startswith('data'): 309 continue 310 311 logging.info('_check_missing_fields in layer %s: %s = %r' % (layer_name, f, _layer._all_fields[f])) 312 if f in { 313 '', 'icmpv6.checksum.status', 'ip.ttl.lncb', 'wpan.aux_sec.key_source.bytes', 'wpan.src64.origin' 314 }: 315 # TODO: handle these fields 316 continue 317 318 v = _layer._all_fields[f] 319 320 if layer_fields.is_layer_field_container(f): 321 continue 322 323 try: 324 rv = self.get_field(p, f) 325 self.assertIsNot(rv, nullField) 326 327 parser = layer_fields._LAYER_FIELDS[f] 328 if isinstance(parser, layer_fields._first): 329 parser = parser._sub_parse 330 331 if parser in (layer_fields._raw_hex, layer_fields._hex, layer_fields._raw_hex_rev, layer_fields._dec, 332 layer_fields._auto): 333 self.assertIsInstance(rv, int) 334 elif isinstance(parser, layer_fields._list): 335 self.assertIsInstance(rv, list) 336 elif parser is layer_fields._ipv6_addr: 337 self.assertIsInstance(rv, Ipv6Addr) 338 elif parser is layer_fields._eth_addr: 339 self.assertIsInstance(rv, EthAddr) 340 elif parser is layer_fields._ext_addr: 341 self.assertIsInstance(rv, ExtAddr) 342 elif parser is layer_fields._str: 343 self.assertIsInstance(rv, str) 344 elif parser is layer_fields._bytes: 345 self.assertIsInstance(rv, Bytes) 346 elif parser is layer_fields._payload: 347 self.assertIsInstance(rv, bytearray) 348 elif parser is layer_fields._float: 349 self.assertIsInstance(rv, float) 350 else: 351 raise NotImplementedError(parser) 352 except Exception: 353 logging.info('checking [%s] %s=%r, %r, %r (%d)' % 354 (layer_name, f, v, v.get_default_value(), v.raw_value, len(v.fields))) 355 raise 356