1#!/usr/bin/env python3
2#
3#  Copyright (c) 2016, The OpenThread Authors.
4#  All rights reserved.
5#
6#  Redistribution and use in source and binary forms, with or without
7#  modification, are permitted provided that the following conditions are met:
8#  1. Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#  2. Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#  3. Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27#  POSSIBILITY OF SUCH DAMAGE.
28#
29
30import hmac
31import hashlib
32import struct
33
34from binascii import hexlify
35
36
37class CryptoEngine:
38    """ Class responsible for encryption and decryption of data. """
39
40    def __init__(self, crypto_material_creator):
41        """
42        Args:
43            network_key (bytearray)
44
45        """
46        self._crypto_material_creator = crypto_material_creator
47
48    @property
49    def mic_length(self):
50        return self._crypto_material_creator.mic_length
51
52    def encrypt(self, data, message_info):
53        """ Encrypt message.
54
55        Args:
56            data (bytearray)
57            message_info (MessageInfo)
58
59        Returns:
60            tuple: Encrypted message (bytearray), MIC (bytearray)
61
62        """
63        key, nonce, auth_data = self._crypto_material_creator.create_key_and_nonce_and_authenticated_data(message_info)
64
65        from Crypto.Cipher import AES
66
67        cipher = AES.new(key, AES.MODE_CCM, nonce, mac_len=self.mic_length)
68        cipher.update(auth_data)
69
70        return cipher.encrypt_and_digest(bytes(data))
71
72    def decrypt(self, enc_data, mic, message_info):
73        """ Decrypt MLE message.
74
75        Args:
76            enc_data (bytearray)
77            mic (bytearray)
78            message_info (MessageInfo)
79
80        Returns:
81            bytearray: Decrypted message.
82
83        """
84        key, nonce, auth_data = self._crypto_material_creator.create_key_and_nonce_and_authenticated_data(message_info)
85
86        from Crypto.Cipher import AES
87
88        cipher = AES.new(key, AES.MODE_CCM, nonce, mac_len=self.mic_length)
89        cipher.update(auth_data)
90
91        dec_data = cipher.decrypt_and_verify(bytes(enc_data), bytes(mic))
92        return bytearray(dec_data)
93
94
95class CryptoMaterialCreator(object):
96
97    _salt = b'Thread'
98
99    def __init__(self, network_key):
100        """
101        Args:
102            network_key (bytearray)
103
104        """
105        self.network_key = network_key
106
107    def _generate_keys(self, sequence_counter):
108        """ Generate MLE and MAC keys.
109
110        Read more: 7.1.4. Key Generation - Thread v1.1 Specification Final
111
112        Args:
113            sequence_counter (int)
114
115        Returns:
116            tuple: MLE and MAC as bytes
117
118        """
119        k = self.network_key
120        s = struct.pack(">L", sequence_counter) + self._salt
121        d = hmac.new(k, s, digestmod=hashlib.sha256).digest()
122
123        mle = d[:16]
124        mac = d[16:]
125        return mle, mac
126
127    def create_key_and_nonce_and_authenticated_data(self, message_info):
128        raise NotImplementedError
129
130    @property
131    def mic_length(self):
132        raise NotImplementedError
133
134
135class MacCryptoMaterialCreator(CryptoMaterialCreator):
136
137    def __init__(self, network_key):
138        """
139        Args:
140            network_key (bytearray)
141
142        """
143        super(MacCryptoMaterialCreator, self).__init__(network_key)
144
145    def _create_nonce(self, eui64, frame_counter, security_level):
146        """ Create CCM Nonce required by AES-128 CCM for encryption and decryption.
147
148        Read more: 7.6.3.2 CCM Nonce - Std 802.15.4-2006
149
150        Args:
151            eui64 (bytes)
152            frame_counter (int)
153            security_level (int)
154
155        Returns:
156            bytes: created Nonce
157
158        """
159        return bytes(eui64 + struct.pack(">LB", frame_counter, security_level))
160
161    def _create_authenticated_data(self, mhr, auxiliary_security_header, extra_open_fields):
162        """ Create Authenticated Data
163
164        Read more: 7.6.3.3 CCM prerequisites - Std 802.15.4-2006
165
166        Args:
167            mhr (bytes)
168            auxiliary_security_header (bytes)
169            extra_open_fields (bytes)
170
171        Returns:
172            bytes: Authenticated Data
173
174        """
175        return bytes(mhr + auxiliary_security_header + extra_open_fields)
176
177    def create_key_and_nonce_and_authenticated_data(self, message_info):
178        _, mac_key = self._generate_keys(message_info.aux_sec_hdr.sequence_counter)
179
180        nonce = self._create_nonce(
181            message_info.source_mac_address,
182            message_info.aux_sec_hdr.frame_counter,
183            message_info.aux_sec_hdr.security_level,
184        )
185
186        auth_data = self._create_authenticated_data(
187            message_info.mhr_bytes,
188            message_info.aux_sec_hdr_bytes,
189            message_info.extra_open_fields,
190        )
191
192        return mac_key, nonce, auth_data
193
194    @property
195    def mic_length(self):
196        return 4
197
198
199class MleCryptoMaterialCreator(CryptoMaterialCreator):
200
201    def __init__(self, network_key):
202        """
203        Args:
204            network_key (bytearray)
205
206        """
207        super(MleCryptoMaterialCreator, self).__init__(network_key)
208
209    def _create_nonce(self, source_eui64, frame_counter, security_level):
210        """ Create CCM Nonce required by AES-128 CCM for encryption and decryption.
211
212        Read more: 7.6.3.2 CCM Nonce - Std 802.15.4-2006
213
214        Args:
215            eui64 (bytearray)
216            frame_counter (int)
217            security_level (int)
218
219        Returns:
220            bytes: created Nonce
221
222        """
223        return bytes(source_eui64[:8] + struct.pack(">LB", frame_counter, security_level))
224
225    def _create_authenticated_data(self, source_address, destination_address, auxiliary_security_header):
226        """ Create Authenticated Data
227
228        Read more: 4.8 - Thread v1.0 Specification
229
230        Args:
231            source_address (ip_address)
232            destination_address (ip_address)
233            auxiliary_security_header (bytearray)
234
235        Returns:
236            bytes: Authenticated Data
237
238        """
239        return bytes(source_address.packed + destination_address.packed + auxiliary_security_header)
240
241    def create_key_and_nonce_and_authenticated_data(self, message_info):
242        mle_key, _ = self._generate_keys(message_info.aux_sec_hdr.sequence_counter)
243
244        nonce = self._create_nonce(
245            message_info.source_mac_address.mac_address,
246            message_info.aux_sec_hdr.frame_counter,
247            message_info.aux_sec_hdr.security_level,
248        )
249
250        auth_data = self._create_authenticated_data(
251            message_info.source_ipv6,
252            message_info.destination_ipv6,
253            message_info.aux_sec_hdr_bytes,
254        )
255
256        return mle_key, nonce, auth_data
257
258    @property
259    def mic_length(self):
260        return 4
261
262
263class AuxiliarySecurityHeader:
264
265    def __init__(
266        self,
267        key_id_mode,
268        security_level,
269        frame_counter,
270        key_id,
271        big_endian=True,
272    ):
273        """
274        Args:
275            key_id_mode (int)
276            security_level (int)
277            frame_counter (int)
278            key_id (bytearray)
279        """
280        self._key_id_mode = key_id_mode
281        self._security_level = security_level
282        self._frame_counter = frame_counter
283        self._key_id = key_id
284        self._big_endian = big_endian
285
286    @property
287    def sequence_counter(self):
288        """ Compute or extract sequence counter based on currently set Key Index Mode. """
289
290        if self.key_id_mode == 0:
291            key_source = self.key_id[:8]
292            format = ">Q" if self._big_endian else "<Q"
293        elif self.key_id_mode == 1:
294            # Try to guess valid Key Sequence Counter based on Key Index. This
295            # one should work for now.
296            return self.key_index - 1
297        elif self.key_id_mode == 2:
298            # In this mode sequence counter is stored on the first four bytes
299            # of Key ID.
300            key_source = self.key_id[:4]
301            format = ">I" if self._big_endian else "<I"
302        else:
303            raise ValueError("Unsupported Key Index Mode: {}".format(self.key_id_mode))
304
305        return struct.unpack(format, key_source)[0]
306
307    @property
308    def key_index(self):
309        return struct.unpack(">B", self.key_id[-1:])[0]
310
311    @property
312    def key_id_mode(self):
313        return self._key_id_mode
314
315    @property
316    def security_level(self):
317        return self._security_level
318
319    @property
320    def frame_counter(self):
321        return self._frame_counter
322
323    @property
324    def key_id(self):
325        return self._key_id
326
327    def __repr__(self):
328        return "AuxiliarySecurityHeader(key_id_mode={}, security_level={}, frame_counter={}, key_id={})".format(
329            self.key_id_mode,
330            self.security_level,
331            self.frame_counter,
332            hexlify(self.key_id),
333        )
334
335
336class AuxiliarySecurityHeaderFactory:
337
338    _SECURITY_CONTROL_LENGTH = 1
339    _FRAME_COUNTER_LENGTH = 4
340
341    _KEY_ID_LENGTH_KEY_ID_0 = 0
342    _KEY_ID_LENGTH_KEY_ID_1 = 1
343    _KEY_ID_LENGTH_KEY_ID_2 = 5
344    _KEY_ID_LENGTH_KEY_ID_3 = 9
345
346    _key_id_lengths = {
347        0: _KEY_ID_LENGTH_KEY_ID_0,
348        1: _KEY_ID_LENGTH_KEY_ID_1,
349        2: _KEY_ID_LENGTH_KEY_ID_2,
350        3: _KEY_ID_LENGTH_KEY_ID_3,
351    }
352
353    def _parse_security_control(self, security_control_byte):
354        security_level = security_control_byte & 0x07
355        key_id_mode = (security_control_byte >> 3) & 0x03
356
357        return security_level, key_id_mode
358
359    def _parse_frame_counter(self, frame_counter_bytes):
360        return struct.unpack("<I", frame_counter_bytes)[0]
361
362    def _key_id_length(self, key_id_mode):
363        return self._key_id_lengths[key_id_mode]
364
365    def parse(self, data, message_info):
366        security_control_bytes = bytearray(data.read(self._SECURITY_CONTROL_LENGTH))
367        frame_counter_bytes = bytearray(data.read(self._FRAME_COUNTER_LENGTH))
368
369        security_level, key_id_mode = self._parse_security_control(security_control_bytes[0])
370        frame_counter = self._parse_frame_counter(frame_counter_bytes)
371
372        key_id_length = self._key_id_length(key_id_mode)
373        key_id_bytes = bytearray(data.read(key_id_length))
374
375        aux_sec_hdr = AuxiliarySecurityHeader(key_id_mode, security_level, frame_counter, key_id_bytes)
376
377        message_info.aux_sec_hdr_bytes = (security_control_bytes + frame_counter_bytes + key_id_bytes)
378        message_info.aux_sec_hdr = aux_sec_hdr
379
380        return aux_sec_hdr
381