1#!/usr/bin/env python3
2#
3#  Copyright (c) 2016, The OpenThread Authors.
4#  All rights reserved.
5#
6#  Redistribution and use in source and binary forms, with or without
7#  modification, are permitted provided that the following conditions are met:
8#  1. Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#  2. Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#  3. Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27#  POSSIBILITY OF SUCH DAMAGE.
28#
29
30import io
31import random
32import struct
33import unittest
34import ipaddress
35
36import common
37import net_crypto
38
39network_key = bytearray(
40    [0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99, 0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff])
41
42
43def convert_aux_sec_hdr_to_bytearray(aux_sec_hdr):
44    data = bytearray([aux_sec_hdr.security_level | ((aux_sec_hdr.key_id_mode & 0x03) << 3)])
45    data += struct.pack("<L", aux_sec_hdr.frame_counter)
46    data += aux_sec_hdr.key_id
47    return data
48
49
50def any_eui64():
51    return bytearray([random.getrandbits(8) for _ in range(8)])
52
53
54def any_security_level():
55    return random.getrandbits(3)
56
57
58def any_key_id_mode():
59    """
60    Only key id mode 2.
61    """
62    return 2
63
64
65def any_key_id(key_id_mode):
66    if key_id_mode == 2:
67        length = 5
68
69    return bytearray([random.getrandbits(8) for _ in range(length)])
70
71
72def any_auxiliary_security_header():
73    key_id_mode = any_key_id_mode()
74    key_id = any_key_id(key_id_mode)
75
76    return net_crypto.AuxiliarySecurityHeader(key_id_mode, any_security_level(), any_frame_counter(), key_id)
77
78
79def any_frame_counter():
80    return random.getrandbits(32)
81
82
83def any_ip_address():
84    ip_address_bytes = bytearray([random.getrandbits(8) for _ in range(16)])
85    return ipaddress.ip_address(bytes(ip_address_bytes))
86
87
88def any_data(length=None):
89    length = length if length is not None else random.randint(0, 128)
90    return bytearray([random.getrandbits(8) for _ in range(length)])
91
92
93def any_network_key():
94    return bytearray([random.getrandbits(8) for _ in range(16)])
95
96
97class TestCryptoEngine(unittest.TestCase):
98
99    def test_should_decrypt_bytearray_to_mle_message_when_decrypt_method_is_called(self):
100        # GIVEN
101        message_info = common.MessageInfo()
102        message_info.source_mac_address = common.MacAddress.from_eui64(
103            bytearray([0x00, 0x35, 0xcc, 0x94, 0xd7, 0x7a, 0x07, 0xe8]))
104
105        message_info.source_ipv6 = "fe80::235:cc94:d77a:07e8"
106        message_info.destination_ipv6 = "ff02::2"
107
108        message_info.aux_sec_hdr = net_crypto.AuxiliarySecurityHeader(key_id_mode=2,
109                                                                      security_level=5,
110                                                                      frame_counter=262165,
111                                                                      key_id=bytearray([0x00, 0x00, 0x00, 0x00, 0x01]))
112        message_info.aux_sec_hdr_bytes = convert_aux_sec_hdr_to_bytearray(message_info.aux_sec_hdr)
113
114        data = bytearray([
115            0x9a, 0x5a, 0x9a, 0x5b, 0xba, 0x25, 0x9c, 0x5e, 0x58, 0xa2, 0x7e, 0x75, 0x74, 0xef, 0x79, 0xbc, 0x4f, 0xa3,
116            0xf9, 0xae, 0xa8, 0x34, 0xf6, 0xf2, 0x37, 0x21, 0x93, 0x60
117        ])
118
119        mic = bytearray([0xe1, 0xb5, 0xa2, 0x53])
120
121        net_crypto_engine = net_crypto.CryptoEngine(net_crypto.MleCryptoMaterialCreator(network_key))
122
123        # WHEN
124        mle_msg = net_crypto_engine.decrypt(data, mic, message_info)
125
126        # THEN
127        expected_mle_msg = bytearray([
128            0x04, 0x00, 0x02, 0x00, 0x00, 0x09, 0x0b, 0x8f, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x40, 0x00, 0x01, 0xf1,
129            0x0b, 0x08, 0x65, 0x5e, 0x0f, 0x83, 0x40, 0xc7, 0x83, 0x31
130        ])
131        self.assertEqual(expected_mle_msg, mle_msg)
132
133    def test_should_encrypt_mle_message_to_bytearray_when_encrypt_method_is_called(self):
134        # GIVEN
135        message_info = common.MessageInfo()
136        message_info.source_mac_address = common.MacAddress.from_eui64(
137            bytearray([0x00, 0x35, 0xcc, 0x94, 0xd7, 0x7a, 0x07, 0xe8]))
138
139        message_info.source_ipv6 = "fe80::235:cc94:d77a:07e8"
140        message_info.destination_ipv6 = "ff02::2"
141
142        message_info.aux_sec_hdr = net_crypto.AuxiliarySecurityHeader(key_id_mode=2,
143                                                                      security_level=5,
144                                                                      frame_counter=262165,
145                                                                      key_id=bytearray([0x00, 0x00, 0x00, 0x00, 0x01]))
146        message_info.aux_sec_hdr_bytes = convert_aux_sec_hdr_to_bytearray(message_info.aux_sec_hdr)
147
148        mle_msg = bytearray([
149            0x04, 0x00, 0x02, 0x00, 0x00, 0x09, 0x0b, 0x8f, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x40, 0x00, 0x01, 0xf1,
150            0x0b, 0x08, 0x65, 0x5e, 0x0f, 0x83, 0x40, 0xc7, 0x83, 0x31
151        ])
152
153        net_crypto_engine = net_crypto.CryptoEngine(net_crypto.MleCryptoMaterialCreator(network_key))
154
155        # WHEN
156        encrypted_data, mic = net_crypto_engine.encrypt(mle_msg, message_info)
157
158        # THEN
159        expected_encrypted_data = bytearray([
160            0x9a, 0x5a, 0x9a, 0x5b, 0xba, 0x25, 0x9c, 0x5e, 0x58, 0xa2, 0x7e, 0x75, 0x74, 0xef, 0x79, 0xbc, 0x4f, 0xa3,
161            0xf9, 0xae, 0xa8, 0x34, 0xf6, 0xf2, 0x37, 0x21, 0x93, 0x60, 0xe1, 0xb5, 0xa2, 0x53
162        ])
163
164        self.assertEqual(expected_encrypted_data, encrypted_data + mic)
165
166    def test_should_encrypt_and_decrypt_random_data_content_when_proper_methods_are_called(self):
167        # GIVEN
168        data = any_data()
169
170        network_key = any_network_key()
171
172        key_id_mode = 2
173        security_level = 5
174
175        message_info = common.MessageInfo()
176        message_info.source_mac_address = common.MacAddress.from_eui64(any_eui64())
177
178        message_info.source_ipv6 = any_ip_address()
179        message_info.destination_ipv6 = any_ip_address()
180
181        message_info.aux_sec_hdr = net_crypto.AuxiliarySecurityHeader(key_id_mode=key_id_mode,
182                                                                      security_level=security_level,
183                                                                      frame_counter=any_frame_counter(),
184                                                                      key_id=any_key_id(key_id_mode))
185        message_info.aux_sec_hdr_bytes = convert_aux_sec_hdr_to_bytearray(message_info.aux_sec_hdr)
186
187        net_crypto_engine = net_crypto.CryptoEngine(net_crypto.MleCryptoMaterialCreator(network_key))
188
189        # WHEN
190        enc_data, mic = net_crypto_engine.encrypt(data, message_info)
191        dec_data = net_crypto_engine.decrypt(enc_data, mic, message_info)
192
193        # THEN
194        self.assertEqual(data, dec_data)
195
196
197class TestCryptoMaterialCreator(unittest.TestCase):
198    """ Key generaion was described in Thread specification.
199
200    Read more: Thread 1.1.0 Specification Candidate Final - 7.1.4 Key Generation
201
202    Test vectors was taken from thread specification.
203    """
204
205    def test_should_generate_mle_and_mac_key_when_generate_keys_method_is_called_with_sequence_counter_equal_0(self):
206        """
207        7.1.4.1 Test Vector 1
208        """
209
210        # GIVEN
211        sequence_counter = 0
212
213        creator = net_crypto.CryptoMaterialCreator(network_key)
214
215        # WHEN
216        mle_key, mac_key = creator._generate_keys(sequence_counter)
217
218        # THEN
219        self.assertEqual(
220            mle_key,
221            bytearray([0x54, 0x45, 0xf4, 0x15, 0x8f, 0xd7, 0x59, 0x12, 0x17, 0x58, 0x09, 0xf8, 0xb5, 0x7a, 0x66,
222                       0xa4]))
223        self.assertEqual(
224            mac_key,
225            bytearray([0xde, 0x89, 0xc5, 0x3a, 0xf3, 0x82, 0xb4, 0x21, 0xe0, 0xfd, 0xe5, 0xa9, 0xba, 0xe3, 0xbe,
226                       0xf0]))
227
228    def test_should_generate_mle_and_mac_key_when_generate_keys_method_is_called_with_sequence_counter_equal_1(self):
229        """
230        7.1.4.2 Test Vector 2
231        """
232
233        # GIVEN
234        sequence_counter = 1
235
236        creator = net_crypto.CryptoMaterialCreator(network_key)
237
238        # WHEN
239        mle_key, mac_key = creator._generate_keys(sequence_counter)
240
241        # THEN
242        self.assertEqual(
243            mle_key,
244            bytearray([0x8f, 0x4c, 0xd1, 0xa2, 0x7d, 0x95, 0xc0, 0x7d, 0x12, 0xdb, 0x89, 0x74, 0xbd, 0x61, 0x5c,
245                       0x13]))
246        self.assertEqual(
247            mac_key,
248            bytearray([0x9b, 0xe0, 0xd1, 0xaf, 0x7b, 0xd8, 0x73, 0x50, 0xde, 0xab, 0xcd, 0xd0, 0x7f, 0xeb, 0xb9,
249                       0xd5]))
250
251    def test_should_generate_mle_and_mac_key_when_generate_keys_method_is_called_with_sequence_counter_equal_2(self):
252        """
253        7.1.4.3 Test Vector 3
254        """
255
256        # GIVEN
257        sequence_counter = 2
258
259        creator = net_crypto.CryptoMaterialCreator(network_key)
260
261        # WHEN
262        mle_key, mac_key = creator._generate_keys(sequence_counter)
263
264        # THEN
265        self.assertEqual(
266            mle_key,
267            bytearray([0x01, 0x6e, 0x2a, 0xb8, 0xec, 0x88, 0x87, 0x96, 0x87, 0xa7, 0x2e, 0x0a, 0x35, 0x7e, 0xcf,
268                       0x2a]))
269        self.assertEqual(
270            mac_key,
271            bytearray([0x56, 0x41, 0x09, 0xe9, 0xd2, 0xaa, 0xd7, 0xf7, 0x23, 0xec, 0x3b, 0x96, 0x11, 0x0e, 0xef,
272                       0xa3]))
273
274
275class TestMleCryptoMaterialCreator(unittest.TestCase):
276
277    def test_should_create_nonce_when_create_nonce_method_is_called(self):
278        # GIVEN
279        source_eui64 = any_eui64()
280        frame_counter = any_frame_counter()
281        security_level = any_security_level()
282
283        creator = net_crypto.MleCryptoMaterialCreator(network_key)
284
285        # WHEN
286        nonce = creator._create_nonce(source_eui64, frame_counter, security_level)
287
288        # THEN
289        nonce_bytes = io.BytesIO(nonce)
290
291        self.assertEqual(source_eui64, nonce_bytes.read(8))
292        self.assertEqual(struct.pack(">L", frame_counter), nonce_bytes.read(4))
293        self.assertEqual(security_level, ord(nonce_bytes.read(1)))
294
295    def test_should_create_authenticated_data_when_create_authenticated_data_method_is_called(self):
296        """
297        Only Key id mode 2.
298        Length of the Auxiliary Security Header is constantly equal 10.
299        """
300
301        # GIVEN
302        source_address = any_ip_address()
303        destination_address = any_ip_address()
304        auxiliary_security_header_bytes = convert_aux_sec_hdr_to_bytearray(any_auxiliary_security_header())
305
306        creator = net_crypto.MleCryptoMaterialCreator(network_key)
307
308        # WHEN
309        authenticated_data = creator._create_authenticated_data(source_address, destination_address,
310                                                                auxiliary_security_header_bytes)
311
312        # THEN
313        authenticated_data_bytes = io.BytesIO(authenticated_data)
314
315        self.assertEqual(source_address.packed, authenticated_data_bytes.read(16))
316        self.assertEqual(destination_address.packed, authenticated_data_bytes.read(16))
317        self.assertEqual(auxiliary_security_header_bytes, authenticated_data_bytes.read(10))
318
319    def test_should_create_key_and_nonce_and_auth_data_when_create_key_and_nonce_and_auth_data_is_called(self):
320        # GIVEN
321        message_info = common.MessageInfo()
322        message_info.source_mac_address = common.MacAddress.from_eui64(any_eui64())
323
324        message_info.source_ipv6 = any_ip_address()
325        message_info.destination_ipv6 = any_ip_address()
326
327        message_info.aux_sec_hdr = any_auxiliary_security_header()
328        message_info.aux_sec_hdr_bytes = convert_aux_sec_hdr_to_bytearray(message_info.aux_sec_hdr)
329
330        creator = net_crypto.MleCryptoMaterialCreator(network_key)
331
332        # WHEN
333        key, nonce, auth_data = creator.create_key_and_nonce_and_authenticated_data(message_info)
334
335        # THEN
336        self.assertEqual(
337            message_info.source_mac_address.mac_address +
338            struct.pack(">LB", message_info.aux_sec_hdr.frame_counter, message_info.aux_sec_hdr.security_level), nonce)
339
340        self.assertEqual(
341            message_info.source_ipv6.packed + message_info.destination_ipv6.packed + message_info.aux_sec_hdr_bytes,
342            auth_data)
343
344
345class TestAuxiliarySecurityHeader(unittest.TestCase):
346
347    def test_should_return_key_id_mode_value_when_key_id_mode_property_is_called(self):
348        # GIVEN
349        key_id_mode = any_key_id_mode()
350
351        aux_sec_hdr_obj = net_crypto.AuxiliarySecurityHeader(key_id_mode, any_security_level(), any_frame_counter(),
352                                                             any_key_id(key_id_mode))
353
354        # WHEN
355        actual_key_id_mode = aux_sec_hdr_obj.key_id_mode
356
357        # THEN
358        self.assertEqual(key_id_mode, actual_key_id_mode)
359
360    def test_should_return_security_level_value_when_security_level_property_is_called(self):
361        # GIVEN
362        security_level = any_security_level()
363        key_id_mode = any_key_id_mode()
364
365        aux_sec_hdr_obj = net_crypto.AuxiliarySecurityHeader(key_id_mode, security_level, any_frame_counter(),
366                                                             any_key_id(key_id_mode))
367
368        # WHEN
369        actual_security_level = aux_sec_hdr_obj.security_level
370
371        # THEN
372        self.assertEqual(security_level, actual_security_level)
373
374    def test_should_return_frame_counter_value_when_frame_counter_property_is_called(self):
375        # GIVEN
376        frame_counter = any_frame_counter()
377        key_id_mode = any_key_id_mode()
378
379        aux_sec_hdr_obj = net_crypto.AuxiliarySecurityHeader(key_id_mode, any_security_level(), frame_counter,
380                                                             any_key_id(key_id_mode))
381
382        # WHEN
383        actual_frame_counter = aux_sec_hdr_obj.frame_counter
384
385        # THEN
386        self.assertEqual(frame_counter, actual_frame_counter)
387
388    def test_should_return_key_id_value_when_key_id_property_is_called(self):
389        # GIVEN
390        key_id_mode = any_key_id_mode()
391        key_id = any_key_id(key_id_mode)
392
393        aux_sec_hdr_obj = net_crypto.AuxiliarySecurityHeader(key_id_mode, any_security_level(), any_frame_counter(),
394                                                             key_id)
395
396        # WHEN
397        actual_key_id = aux_sec_hdr_obj.key_id
398
399        # THEN
400        self.assertEqual(key_id, actual_key_id)
401
402    def test_should_return_sequence_counter_value_when_sequence_counter_property_is_called(self):
403        # GIVEN
404        key_id_mode = 2
405        key_id = any_key_id(key_id_mode)
406
407        aux_sec_hdr_obj = net_crypto.AuxiliarySecurityHeader(key_id_mode, any_security_level(), any_frame_counter(),
408                                                             key_id)
409
410        # WHEN
411        actual_sequence_counter = aux_sec_hdr_obj.sequence_counter
412
413        # THEN
414        self.assertEqual(struct.unpack(">I", key_id[:4])[0], actual_sequence_counter)
415
416
417class TestAuxiliarySecurityHeaderFactory(unittest.TestCase):
418
419    def test_should_create_AuxiliarySecurityHeader_from_bytearray_when_parse_method_is_called(self):
420        # GIVEN
421        key_id_mode = any_key_id_mode()
422        sec_lvl = any_security_level()
423        frame_counter = any_frame_counter()
424        key_id = any_key_id(key_id_mode)
425
426        factory = net_crypto.AuxiliarySecurityHeaderFactory()
427
428        data = bytearray([sec_lvl | key_id_mode << 3]) + struct.pack("<I", frame_counter) + key_id
429
430        # WHEN
431        aux_sec_hdr = factory.parse(io.BytesIO(data), common.MessageInfo())
432
433        # THEN
434        self.assertTrue(isinstance(aux_sec_hdr, net_crypto.AuxiliarySecurityHeader))
435        self.assertEqual(key_id_mode, aux_sec_hdr.key_id_mode)
436        self.assertEqual(sec_lvl, aux_sec_hdr.security_level)
437        self.assertEqual(frame_counter, aux_sec_hdr.frame_counter)
438
439
440if __name__ == "__main__":
441    unittest.main()
442