1#!/usr/bin/env python3 2# 3# Copyright (c) 2016, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28# 29 30import io 31import random 32import struct 33import unittest 34import ipaddress 35 36import common 37import net_crypto 38 39network_key = bytearray( 40 [0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99, 0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff]) 41 42 43def convert_aux_sec_hdr_to_bytearray(aux_sec_hdr): 44 data = bytearray([aux_sec_hdr.security_level | ((aux_sec_hdr.key_id_mode & 0x03) << 3)]) 45 data += struct.pack("<L", aux_sec_hdr.frame_counter) 46 data += aux_sec_hdr.key_id 47 return data 48 49 50def any_eui64(): 51 return bytearray([random.getrandbits(8) for _ in range(8)]) 52 53 54def any_security_level(): 55 return random.getrandbits(3) 56 57 58def any_key_id_mode(): 59 """ 60 Only key id mode 2. 61 """ 62 return 2 63 64 65def any_key_id(key_id_mode): 66 if key_id_mode == 2: 67 length = 5 68 69 return bytearray([random.getrandbits(8) for _ in range(length)]) 70 71 72def any_auxiliary_security_header(): 73 key_id_mode = any_key_id_mode() 74 key_id = any_key_id(key_id_mode) 75 76 return net_crypto.AuxiliarySecurityHeader(key_id_mode, any_security_level(), any_frame_counter(), key_id) 77 78 79def any_frame_counter(): 80 return random.getrandbits(32) 81 82 83def any_ip_address(): 84 ip_address_bytes = bytearray([random.getrandbits(8) for _ in range(16)]) 85 return ipaddress.ip_address(bytes(ip_address_bytes)) 86 87 88def any_data(length=None): 89 length = length if length is not None else random.randint(0, 128) 90 return bytearray([random.getrandbits(8) for _ in range(length)]) 91 92 93def any_network_key(): 94 return bytearray([random.getrandbits(8) for _ in range(16)]) 95 96 97class TestCryptoEngine(unittest.TestCase): 98 99 def test_should_decrypt_bytearray_to_mle_message_when_decrypt_method_is_called(self): 100 # GIVEN 101 message_info = common.MessageInfo() 102 message_info.source_mac_address = common.MacAddress.from_eui64( 103 bytearray([0x00, 0x35, 0xcc, 0x94, 0xd7, 0x7a, 0x07, 0xe8])) 104 105 message_info.source_ipv6 = "fe80::235:cc94:d77a:07e8" 106 message_info.destination_ipv6 = "ff02::2" 107 108 message_info.aux_sec_hdr = net_crypto.AuxiliarySecurityHeader(key_id_mode=2, 109 security_level=5, 110 frame_counter=262165, 111 key_id=bytearray([0x00, 0x00, 0x00, 0x00, 0x01])) 112 message_info.aux_sec_hdr_bytes = convert_aux_sec_hdr_to_bytearray(message_info.aux_sec_hdr) 113 114 data = bytearray([ 115 0x9a, 0x5a, 0x9a, 0x5b, 0xba, 0x25, 0x9c, 0x5e, 0x58, 0xa2, 0x7e, 0x75, 0x74, 0xef, 0x79, 0xbc, 0x4f, 0xa3, 116 0xf9, 0xae, 0xa8, 0x34, 0xf6, 0xf2, 0x37, 0x21, 0x93, 0x60 117 ]) 118 119 mic = bytearray([0xe1, 0xb5, 0xa2, 0x53]) 120 121 net_crypto_engine = net_crypto.CryptoEngine(net_crypto.MleCryptoMaterialCreator(network_key)) 122 123 # WHEN 124 mle_msg = net_crypto_engine.decrypt(data, mic, message_info) 125 126 # THEN 127 expected_mle_msg = bytearray([ 128 0x04, 0x00, 0x02, 0x00, 0x00, 0x09, 0x0b, 0x8f, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x40, 0x00, 0x01, 0xf1, 129 0x0b, 0x08, 0x65, 0x5e, 0x0f, 0x83, 0x40, 0xc7, 0x83, 0x31 130 ]) 131 self.assertEqual(expected_mle_msg, mle_msg) 132 133 def test_should_encrypt_mle_message_to_bytearray_when_encrypt_method_is_called(self): 134 # GIVEN 135 message_info = common.MessageInfo() 136 message_info.source_mac_address = common.MacAddress.from_eui64( 137 bytearray([0x00, 0x35, 0xcc, 0x94, 0xd7, 0x7a, 0x07, 0xe8])) 138 139 message_info.source_ipv6 = "fe80::235:cc94:d77a:07e8" 140 message_info.destination_ipv6 = "ff02::2" 141 142 message_info.aux_sec_hdr = net_crypto.AuxiliarySecurityHeader(key_id_mode=2, 143 security_level=5, 144 frame_counter=262165, 145 key_id=bytearray([0x00, 0x00, 0x00, 0x00, 0x01])) 146 message_info.aux_sec_hdr_bytes = convert_aux_sec_hdr_to_bytearray(message_info.aux_sec_hdr) 147 148 mle_msg = bytearray([ 149 0x04, 0x00, 0x02, 0x00, 0x00, 0x09, 0x0b, 0x8f, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x40, 0x00, 0x01, 0xf1, 150 0x0b, 0x08, 0x65, 0x5e, 0x0f, 0x83, 0x40, 0xc7, 0x83, 0x31 151 ]) 152 153 net_crypto_engine = net_crypto.CryptoEngine(net_crypto.MleCryptoMaterialCreator(network_key)) 154 155 # WHEN 156 encrypted_data, mic = net_crypto_engine.encrypt(mle_msg, message_info) 157 158 # THEN 159 expected_encrypted_data = bytearray([ 160 0x9a, 0x5a, 0x9a, 0x5b, 0xba, 0x25, 0x9c, 0x5e, 0x58, 0xa2, 0x7e, 0x75, 0x74, 0xef, 0x79, 0xbc, 0x4f, 0xa3, 161 0xf9, 0xae, 0xa8, 0x34, 0xf6, 0xf2, 0x37, 0x21, 0x93, 0x60, 0xe1, 0xb5, 0xa2, 0x53 162 ]) 163 164 self.assertEqual(expected_encrypted_data, encrypted_data + mic) 165 166 def test_should_encrypt_and_decrypt_random_data_content_when_proper_methods_are_called(self): 167 # GIVEN 168 data = any_data() 169 170 network_key = any_network_key() 171 172 key_id_mode = 2 173 security_level = 5 174 175 message_info = common.MessageInfo() 176 message_info.source_mac_address = common.MacAddress.from_eui64(any_eui64()) 177 178 message_info.source_ipv6 = any_ip_address() 179 message_info.destination_ipv6 = any_ip_address() 180 181 message_info.aux_sec_hdr = net_crypto.AuxiliarySecurityHeader(key_id_mode=key_id_mode, 182 security_level=security_level, 183 frame_counter=any_frame_counter(), 184 key_id=any_key_id(key_id_mode)) 185 message_info.aux_sec_hdr_bytes = convert_aux_sec_hdr_to_bytearray(message_info.aux_sec_hdr) 186 187 net_crypto_engine = net_crypto.CryptoEngine(net_crypto.MleCryptoMaterialCreator(network_key)) 188 189 # WHEN 190 enc_data, mic = net_crypto_engine.encrypt(data, message_info) 191 dec_data = net_crypto_engine.decrypt(enc_data, mic, message_info) 192 193 # THEN 194 self.assertEqual(data, dec_data) 195 196 197class TestCryptoMaterialCreator(unittest.TestCase): 198 """ Key generaion was described in Thread specification. 199 200 Read more: Thread 1.1.0 Specification Candidate Final - 7.1.4 Key Generation 201 202 Test vectors was taken from thread specification. 203 """ 204 205 def test_should_generate_mle_and_mac_key_when_generate_keys_method_is_called_with_sequence_counter_equal_0(self): 206 """ 207 7.1.4.1 Test Vector 1 208 """ 209 210 # GIVEN 211 sequence_counter = 0 212 213 creator = net_crypto.CryptoMaterialCreator(network_key) 214 215 # WHEN 216 mle_key, mac_key = creator._generate_keys(sequence_counter) 217 218 # THEN 219 self.assertEqual( 220 mle_key, 221 bytearray([0x54, 0x45, 0xf4, 0x15, 0x8f, 0xd7, 0x59, 0x12, 0x17, 0x58, 0x09, 0xf8, 0xb5, 0x7a, 0x66, 222 0xa4])) 223 self.assertEqual( 224 mac_key, 225 bytearray([0xde, 0x89, 0xc5, 0x3a, 0xf3, 0x82, 0xb4, 0x21, 0xe0, 0xfd, 0xe5, 0xa9, 0xba, 0xe3, 0xbe, 226 0xf0])) 227 228 def test_should_generate_mle_and_mac_key_when_generate_keys_method_is_called_with_sequence_counter_equal_1(self): 229 """ 230 7.1.4.2 Test Vector 2 231 """ 232 233 # GIVEN 234 sequence_counter = 1 235 236 creator = net_crypto.CryptoMaterialCreator(network_key) 237 238 # WHEN 239 mle_key, mac_key = creator._generate_keys(sequence_counter) 240 241 # THEN 242 self.assertEqual( 243 mle_key, 244 bytearray([0x8f, 0x4c, 0xd1, 0xa2, 0x7d, 0x95, 0xc0, 0x7d, 0x12, 0xdb, 0x89, 0x74, 0xbd, 0x61, 0x5c, 245 0x13])) 246 self.assertEqual( 247 mac_key, 248 bytearray([0x9b, 0xe0, 0xd1, 0xaf, 0x7b, 0xd8, 0x73, 0x50, 0xde, 0xab, 0xcd, 0xd0, 0x7f, 0xeb, 0xb9, 249 0xd5])) 250 251 def test_should_generate_mle_and_mac_key_when_generate_keys_method_is_called_with_sequence_counter_equal_2(self): 252 """ 253 7.1.4.3 Test Vector 3 254 """ 255 256 # GIVEN 257 sequence_counter = 2 258 259 creator = net_crypto.CryptoMaterialCreator(network_key) 260 261 # WHEN 262 mle_key, mac_key = creator._generate_keys(sequence_counter) 263 264 # THEN 265 self.assertEqual( 266 mle_key, 267 bytearray([0x01, 0x6e, 0x2a, 0xb8, 0xec, 0x88, 0x87, 0x96, 0x87, 0xa7, 0x2e, 0x0a, 0x35, 0x7e, 0xcf, 268 0x2a])) 269 self.assertEqual( 270 mac_key, 271 bytearray([0x56, 0x41, 0x09, 0xe9, 0xd2, 0xaa, 0xd7, 0xf7, 0x23, 0xec, 0x3b, 0x96, 0x11, 0x0e, 0xef, 272 0xa3])) 273 274 275class TestMleCryptoMaterialCreator(unittest.TestCase): 276 277 def test_should_create_nonce_when_create_nonce_method_is_called(self): 278 # GIVEN 279 source_eui64 = any_eui64() 280 frame_counter = any_frame_counter() 281 security_level = any_security_level() 282 283 creator = net_crypto.MleCryptoMaterialCreator(network_key) 284 285 # WHEN 286 nonce = creator._create_nonce(source_eui64, frame_counter, security_level) 287 288 # THEN 289 nonce_bytes = io.BytesIO(nonce) 290 291 self.assertEqual(source_eui64, nonce_bytes.read(8)) 292 self.assertEqual(struct.pack(">L", frame_counter), nonce_bytes.read(4)) 293 self.assertEqual(security_level, ord(nonce_bytes.read(1))) 294 295 def test_should_create_authenticated_data_when_create_authenticated_data_method_is_called(self): 296 """ 297 Only Key id mode 2. 298 Length of the Auxiliary Security Header is constantly equal 10. 299 """ 300 301 # GIVEN 302 source_address = any_ip_address() 303 destination_address = any_ip_address() 304 auxiliary_security_header_bytes = convert_aux_sec_hdr_to_bytearray(any_auxiliary_security_header()) 305 306 creator = net_crypto.MleCryptoMaterialCreator(network_key) 307 308 # WHEN 309 authenticated_data = creator._create_authenticated_data(source_address, destination_address, 310 auxiliary_security_header_bytes) 311 312 # THEN 313 authenticated_data_bytes = io.BytesIO(authenticated_data) 314 315 self.assertEqual(source_address.packed, authenticated_data_bytes.read(16)) 316 self.assertEqual(destination_address.packed, authenticated_data_bytes.read(16)) 317 self.assertEqual(auxiliary_security_header_bytes, authenticated_data_bytes.read(10)) 318 319 def test_should_create_key_and_nonce_and_auth_data_when_create_key_and_nonce_and_auth_data_is_called(self): 320 # GIVEN 321 message_info = common.MessageInfo() 322 message_info.source_mac_address = common.MacAddress.from_eui64(any_eui64()) 323 324 message_info.source_ipv6 = any_ip_address() 325 message_info.destination_ipv6 = any_ip_address() 326 327 message_info.aux_sec_hdr = any_auxiliary_security_header() 328 message_info.aux_sec_hdr_bytes = convert_aux_sec_hdr_to_bytearray(message_info.aux_sec_hdr) 329 330 creator = net_crypto.MleCryptoMaterialCreator(network_key) 331 332 # WHEN 333 key, nonce, auth_data = creator.create_key_and_nonce_and_authenticated_data(message_info) 334 335 # THEN 336 self.assertEqual( 337 message_info.source_mac_address.mac_address + 338 struct.pack(">LB", message_info.aux_sec_hdr.frame_counter, message_info.aux_sec_hdr.security_level), nonce) 339 340 self.assertEqual( 341 message_info.source_ipv6.packed + message_info.destination_ipv6.packed + message_info.aux_sec_hdr_bytes, 342 auth_data) 343 344 345class TestAuxiliarySecurityHeader(unittest.TestCase): 346 347 def test_should_return_key_id_mode_value_when_key_id_mode_property_is_called(self): 348 # GIVEN 349 key_id_mode = any_key_id_mode() 350 351 aux_sec_hdr_obj = net_crypto.AuxiliarySecurityHeader(key_id_mode, any_security_level(), any_frame_counter(), 352 any_key_id(key_id_mode)) 353 354 # WHEN 355 actual_key_id_mode = aux_sec_hdr_obj.key_id_mode 356 357 # THEN 358 self.assertEqual(key_id_mode, actual_key_id_mode) 359 360 def test_should_return_security_level_value_when_security_level_property_is_called(self): 361 # GIVEN 362 security_level = any_security_level() 363 key_id_mode = any_key_id_mode() 364 365 aux_sec_hdr_obj = net_crypto.AuxiliarySecurityHeader(key_id_mode, security_level, any_frame_counter(), 366 any_key_id(key_id_mode)) 367 368 # WHEN 369 actual_security_level = aux_sec_hdr_obj.security_level 370 371 # THEN 372 self.assertEqual(security_level, actual_security_level) 373 374 def test_should_return_frame_counter_value_when_frame_counter_property_is_called(self): 375 # GIVEN 376 frame_counter = any_frame_counter() 377 key_id_mode = any_key_id_mode() 378 379 aux_sec_hdr_obj = net_crypto.AuxiliarySecurityHeader(key_id_mode, any_security_level(), frame_counter, 380 any_key_id(key_id_mode)) 381 382 # WHEN 383 actual_frame_counter = aux_sec_hdr_obj.frame_counter 384 385 # THEN 386 self.assertEqual(frame_counter, actual_frame_counter) 387 388 def test_should_return_key_id_value_when_key_id_property_is_called(self): 389 # GIVEN 390 key_id_mode = any_key_id_mode() 391 key_id = any_key_id(key_id_mode) 392 393 aux_sec_hdr_obj = net_crypto.AuxiliarySecurityHeader(key_id_mode, any_security_level(), any_frame_counter(), 394 key_id) 395 396 # WHEN 397 actual_key_id = aux_sec_hdr_obj.key_id 398 399 # THEN 400 self.assertEqual(key_id, actual_key_id) 401 402 def test_should_return_sequence_counter_value_when_sequence_counter_property_is_called(self): 403 # GIVEN 404 key_id_mode = 2 405 key_id = any_key_id(key_id_mode) 406 407 aux_sec_hdr_obj = net_crypto.AuxiliarySecurityHeader(key_id_mode, any_security_level(), any_frame_counter(), 408 key_id) 409 410 # WHEN 411 actual_sequence_counter = aux_sec_hdr_obj.sequence_counter 412 413 # THEN 414 self.assertEqual(struct.unpack(">I", key_id[:4])[0], actual_sequence_counter) 415 416 417class TestAuxiliarySecurityHeaderFactory(unittest.TestCase): 418 419 def test_should_create_AuxiliarySecurityHeader_from_bytearray_when_parse_method_is_called(self): 420 # GIVEN 421 key_id_mode = any_key_id_mode() 422 sec_lvl = any_security_level() 423 frame_counter = any_frame_counter() 424 key_id = any_key_id(key_id_mode) 425 426 factory = net_crypto.AuxiliarySecurityHeaderFactory() 427 428 data = bytearray([sec_lvl | key_id_mode << 3]) + struct.pack("<I", frame_counter) + key_id 429 430 # WHEN 431 aux_sec_hdr = factory.parse(io.BytesIO(data), common.MessageInfo()) 432 433 # THEN 434 self.assertTrue(isinstance(aux_sec_hdr, net_crypto.AuxiliarySecurityHeader)) 435 self.assertEqual(key_id_mode, aux_sec_hdr.key_id_mode) 436 self.assertEqual(sec_lvl, aux_sec_hdr.security_level) 437 self.assertEqual(frame_counter, aux_sec_hdr.frame_counter) 438 439 440if __name__ == "__main__": 441 unittest.main() 442