1#!/usr/bin/env python3 2# 3# Copyright (c) 2020, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 'AS IS' 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28# 29 30import struct 31 32from collections import namedtuple 33from enum import Enum 34 35FCF_FRAME_BEACON = 0 << 0 36FCF_FRAME_DATA = 1 << 0 37FCF_FRAME_ACK = 2 << 0 38FCF_FRAME_MAC_CMD = 3 << 0 39FCF_FRAME_TYPE_MASK = 7 << 0 40FCF_SECURITY_ENABLED = 1 << 3 41FCF_FRAME_PENDING = 1 << 4 42FCF_ACK_REQUEST = 1 << 5 43FCF_PANID_COMPRESSION = 1 << 6 44FCF_IE_PRESENT = 1 << 9 45FCF_DST_ADDR_NONE = 0 << 10 46FCF_DST_ADDR_SHORT = 2 << 10 47FCF_DST_ADDR_EXT = 3 << 10 48FCF_DST_ADDR_MASK = 3 << 10 49FCF_FRAME_VERSION_2006 = 1 << 12 50FCF_FRAME_VERSION_2015 = 2 << 12 51FCF_FRAME_VERSION_MASK = 3 << 12 52FCF_SRC_ADDR_NONE = 0 << 14 53FCF_SRC_ADDR_SHORT = 2 << 14 54FCF_SRC_ADDR_EXT = 3 << 14 55FCD_SRC_ADDR_MASK = 3 << 14 56 57 58class DstAddrMode(Enum): 59 NONE = 0 60 RESERVED = 1 61 SHORT = 2 62 EXTENDED = 3 63 64 65class FrameType(Enum): 66 BEACON = 0 67 DATA = 1 68 ACK = 2 69 COMMAND = 3 70 71 72class WpanFrameInfo(namedtuple('WpanFrameInfo', ['fcf', 'seq_no', 'dst_extaddr', 'dst_short'])): 73 74 @property 75 def frame_type(self) -> FrameType: 76 return FrameType(self.fcf & 0x7) 77 78 @property 79 def dst_addr_mode(self) -> DstAddrMode: 80 return DstAddrMode((self.fcf & 0x0c00) >> 10) 81 82 @property 83 def is_broadcast(self) -> bool: 84 return self.dst_addr_mode == DstAddrMode.SHORT and self.dst_short == 0xffff 85 86 87def _is_version_2015(fcf: int) -> bool: 88 return (fcf & FCF_FRAME_VERSION_MASK) == FCF_FRAME_VERSION_2015 89 90 91def _is_dst_addr_present(fcf: int) -> bool: 92 dst_addr_mode = DstAddrMode((fcf & 0x0c00) >> 10) 93 return dst_addr_mode != DstAddrMode.NONE 94 95 96_DST_PAN_ID_NOT_PRESENT_SET = { 97 FCF_DST_ADDR_NONE | FCF_SRC_ADDR_NONE, 98 FCF_DST_ADDR_EXT | FCF_SRC_ADDR_NONE | FCF_PANID_COMPRESSION, 99 FCF_DST_ADDR_SHORT | FCF_SRC_ADDR_NONE | FCF_PANID_COMPRESSION, 100 FCF_DST_ADDR_NONE | FCF_SRC_ADDR_EXT, 101 FCF_DST_ADDR_NONE | FCF_SRC_ADDR_SHORT, 102 FCF_DST_ADDR_NONE | FCF_SRC_ADDR_EXT | FCF_PANID_COMPRESSION, 103 FCF_DST_ADDR_NONE | FCF_SRC_ADDR_SHORT | FCF_PANID_COMPRESSION, 104 FCF_DST_ADDR_EXT | FCF_SRC_ADDR_EXT | FCF_PANID_COMPRESSION, 105} 106 107 108def _is_dst_pan_id_present(fcf: int) -> bool: 109 if _is_version_2015(fcf): 110 v = fcf & (FCF_DST_ADDR_MASK | FCD_SRC_ADDR_MASK | FCF_PANID_COMPRESSION) 111 present = v not in _DST_PAN_ID_NOT_PRESENT_SET 112 else: 113 present = _is_dst_addr_present(fcf) 114 115 return present 116 117 118def dissect(frame: bytes) -> WpanFrameInfo: 119 fcf = struct.unpack("<H", frame[1:3])[0] 120 121 seq_no = frame[3] 122 123 dst_addr_mode = (fcf & 0x0c00) >> 10 124 125 dst_extaddr, dst_short = None, None 126 127 if dst_addr_mode in [DstAddrMode.SHORT.value, DstAddrMode.EXTENDED.value]: 128 offset = 4 129 130 if _is_dst_pan_id_present(fcf): 131 offset += 2 132 133 if dst_addr_mode == DstAddrMode.SHORT.value: 134 dst_short = struct.unpack('<H', frame[offset:offset + 2])[0] 135 else: 136 dst_extaddr = '%016x' % struct.unpack('<Q', frame[offset:offset + 8])[0] 137 138 return WpanFrameInfo(fcf=fcf, seq_no=seq_no, dst_extaddr=dst_extaddr, dst_short=dst_short) 139