1#!/usr/bin/env python3
2#
3#  Copyright (c) 2020, The OpenThread Authors.
4#  All rights reserved.
5#
6#  Redistribution and use in source and binary forms, with or without
7#  modification, are permitted provided that the following conditions are met:
8#  1. Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#  2. Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#  3. Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 'AS IS'
18#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27#  POSSIBILITY OF SUCH DAMAGE.
28#
29
30import struct
31
32from collections import namedtuple
33from enum import Enum
34
35FCF_FRAME_BEACON = 0 << 0
36FCF_FRAME_DATA = 1 << 0
37FCF_FRAME_ACK = 2 << 0
38FCF_FRAME_MAC_CMD = 3 << 0
39FCF_FRAME_TYPE_MASK = 7 << 0
40FCF_SECURITY_ENABLED = 1 << 3
41FCF_FRAME_PENDING = 1 << 4
42FCF_ACK_REQUEST = 1 << 5
43FCF_PANID_COMPRESSION = 1 << 6
44FCF_IE_PRESENT = 1 << 9
45FCF_DST_ADDR_NONE = 0 << 10
46FCF_DST_ADDR_SHORT = 2 << 10
47FCF_DST_ADDR_EXT = 3 << 10
48FCF_DST_ADDR_MASK = 3 << 10
49FCF_FRAME_VERSION_2006 = 1 << 12
50FCF_FRAME_VERSION_2015 = 2 << 12
51FCF_FRAME_VERSION_MASK = 3 << 12
52FCF_SRC_ADDR_NONE = 0 << 14
53FCF_SRC_ADDR_SHORT = 2 << 14
54FCF_SRC_ADDR_EXT = 3 << 14
55FCD_SRC_ADDR_MASK = 3 << 14
56
57
58class DstAddrMode(Enum):
59    NONE = 0
60    RESERVED = 1
61    SHORT = 2
62    EXTENDED = 3
63
64
65class FrameType(Enum):
66    BEACON = 0
67    DATA = 1
68    ACK = 2
69    COMMAND = 3
70
71
72class WpanFrameInfo(namedtuple('WpanFrameInfo', ['fcf', 'seq_no', 'dst_extaddr', 'dst_short'])):
73
74    @property
75    def frame_type(self) -> FrameType:
76        return FrameType(self.fcf & 0x7)
77
78    @property
79    def dst_addr_mode(self) -> DstAddrMode:
80        return DstAddrMode((self.fcf & 0x0c00) >> 10)
81
82    @property
83    def is_broadcast(self) -> bool:
84        return self.dst_addr_mode == DstAddrMode.SHORT and self.dst_short == 0xffff
85
86
87def _is_version_2015(fcf: int) -> bool:
88    return (fcf & FCF_FRAME_VERSION_MASK) == FCF_FRAME_VERSION_2015
89
90
91def _is_dst_addr_present(fcf: int) -> bool:
92    dst_addr_mode = DstAddrMode((fcf & 0x0c00) >> 10)
93    return dst_addr_mode != DstAddrMode.NONE
94
95
96_DST_PAN_ID_NOT_PRESENT_SET = {
97    FCF_DST_ADDR_NONE | FCF_SRC_ADDR_NONE,
98    FCF_DST_ADDR_EXT | FCF_SRC_ADDR_NONE | FCF_PANID_COMPRESSION,
99    FCF_DST_ADDR_SHORT | FCF_SRC_ADDR_NONE | FCF_PANID_COMPRESSION,
100    FCF_DST_ADDR_NONE | FCF_SRC_ADDR_EXT,
101    FCF_DST_ADDR_NONE | FCF_SRC_ADDR_SHORT,
102    FCF_DST_ADDR_NONE | FCF_SRC_ADDR_EXT | FCF_PANID_COMPRESSION,
103    FCF_DST_ADDR_NONE | FCF_SRC_ADDR_SHORT | FCF_PANID_COMPRESSION,
104    FCF_DST_ADDR_EXT | FCF_SRC_ADDR_EXT | FCF_PANID_COMPRESSION,
105}
106
107
108def _is_dst_pan_id_present(fcf: int) -> bool:
109    if _is_version_2015(fcf):
110        v = fcf & (FCF_DST_ADDR_MASK | FCD_SRC_ADDR_MASK | FCF_PANID_COMPRESSION)
111        present = v not in _DST_PAN_ID_NOT_PRESENT_SET
112    else:
113        present = _is_dst_addr_present(fcf)
114
115    return present
116
117
118def dissect(frame: bytes) -> WpanFrameInfo:
119    fcf = struct.unpack("<H", frame[1:3])[0]
120
121    seq_no = frame[3]
122
123    dst_addr_mode = (fcf & 0x0c00) >> 10
124
125    dst_extaddr, dst_short = None, None
126
127    if dst_addr_mode in [DstAddrMode.SHORT.value, DstAddrMode.EXTENDED.value]:
128        offset = 4
129
130        if _is_dst_pan_id_present(fcf):
131            offset += 2
132
133        if dst_addr_mode == DstAddrMode.SHORT.value:
134            dst_short = struct.unpack('<H', frame[offset:offset + 2])[0]
135        else:
136            dst_extaddr = '%016x' % struct.unpack('<Q', frame[offset:offset + 8])[0]
137
138    return WpanFrameInfo(fcf=fcf, seq_no=seq_no, dst_extaddr=dst_extaddr, dst_short=dst_short)
139