1#!/usr/bin/env python3 2# 3# Copyright (c) 2016, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28# 29 30import io 31import random 32import struct 33import unittest 34 35import common 36import config 37import mle 38import network_data 39 40 41def any_address(): 42 return random.getrandbits(16) 43 44 45def any_receiver(): 46 return random.getrandbits(1) 47 48 49def any_secure(): 50 return random.getrandbits(1) 51 52 53def any_device_type(): 54 return random.getrandbits(1) 55 56 57def any_network_data(): 58 return random.getrandbits(1) 59 60 61mode_map = { 62 0x00: { 63 "receiver": 0, 64 "secure": 0, 65 "device_type": 0, 66 "network_data": 0 67 }, 68 0x08: { 69 "receiver": 1, 70 "secure": 0, 71 "device_type": 0, 72 "network_data": 0 73 }, 74 0x04: { 75 "receiver": 0, 76 "secure": 1, 77 "device_type": 0, 78 "network_data": 0 79 }, 80 0x0C: { 81 "receiver": 1, 82 "secure": 1, 83 "device_type": 0, 84 "network_data": 0 85 }, 86 0x02: { 87 "receiver": 0, 88 "secure": 0, 89 "device_type": 1, 90 "network_data": 0 91 }, 92 0x0A: { 93 "receiver": 1, 94 "secure": 0, 95 "device_type": 1, 96 "network_data": 0 97 }, 98 0x06: { 99 "receiver": 0, 100 "secure": 1, 101 "device_type": 1, 102 "network_data": 0 103 }, 104 0x0E: { 105 "receiver": 1, 106 "secure": 1, 107 "device_type": 1, 108 "network_data": 0 109 }, 110 0x01: { 111 "receiver": 0, 112 "secure": 0, 113 "device_type": 0, 114 "network_data": 1 115 }, 116 0x09: { 117 "receiver": 1, 118 "secure": 0, 119 "device_type": 0, 120 "network_data": 1 121 }, 122 0x05: { 123 "receiver": 0, 124 "secure": 1, 125 "device_type": 0, 126 "network_data": 1 127 }, 128 0x0D: { 129 "receiver": 1, 130 "secure": 1, 131 "device_type": 0, 132 "network_data": 1 133 }, 134 0x03: { 135 "receiver": 0, 136 "secure": 0, 137 "device_type": 1, 138 "network_data": 1 139 }, 140 0x0B: { 141 "receiver": 1, 142 "secure": 0, 143 "device_type": 1, 144 "network_data": 1 145 }, 146 0x07: { 147 "receiver": 0, 148 "secure": 1, 149 "device_type": 1, 150 "network_data": 1 151 }, 152 0x0F: { 153 "receiver": 1, 154 "secure": 1, 155 "device_type": 1, 156 "network_data": 1 157 } 158} 159 160 161def any_mode(): 162 return random.getrandbits(4) 163 164 165def any_timeout(): 166 return random.getrandbits(32) 167 168 169def any_challenge(): 170 length = random.randint(4, 8) 171 return bytearray(random.getrandbits(8) for _ in range(length)) 172 173 174def any_response(): 175 length = random.randint(4, 8) 176 return bytearray(random.getrandbits(8) for _ in range(length)) 177 178 179def any_link_layer_frame_counter(): 180 return random.getrandbits(32) 181 182 183def any_mle_frame_counter(): 184 return random.getrandbits(32) 185 186 187def any_output(): 188 return random.getrandbits(2) 189 190 191def any_input(): 192 return random.getrandbits(2) 193 194 195def any_route(): 196 return random.getrandbits(4) 197 198 199def any_router_id_mask(): 200 return random.getrandbits(64) 201 202 203def any_link_quality_and_route_data(length=None): 204 length = length if length is not None else random.randint(0, 63) 205 return [random.getrandbits(8) for _ in range(length)] 206 207 208def any_partition_id(): 209 return random.getrandbits(32) 210 211 212def any_weighting(): 213 return random.getrandbits(8) 214 215 216def any_data_version(): 217 return random.getrandbits(8) 218 219 220def any_stable_data_version(): 221 return random.getrandbits(8) 222 223 224def any_leader_router_id(): 225 return random.getrandbits(8) 226 227 228scan_mask_map = { 229 0x00: { 230 "router": 0, 231 "end_device": 0 232 }, 233 0x40: { 234 "router": 0, 235 "end_device": 1 236 }, 237 0x80: { 238 "router": 1, 239 "end_device": 0 240 }, 241 0xC0: { 242 "router": 1, 243 "end_device": 1 244 }, 245} 246 247 248def any_scan_mask_router(): 249 return random.getrandbits(1) 250 251 252def any_scan_mask_end_device(): 253 return random.getrandbits(1) 254 255 256def any_scan_mask(): 257 return (random.getrandbits(2) << 6) 258 259 260def any_link_margin(): 261 return random.getrandbits(8) 262 263 264def any_status(): 265 return random.getrandbits(8) 266 267 268def any_version(): 269 return random.getrandbits(16) 270 271 272def any_channel_page(): 273 return random.getrandbits(8) 274 275 276def any_channel(): 277 return random.getrandbits(16) 278 279 280def any_pan_id(): 281 return random.getrandbits(16) 282 283 284def any_timestamp_seconds(): 285 return random.getrandbits(48) 286 287 288def any_timestamp_ticks(): 289 return random.getrandbits(15) 290 291 292def any_u(): 293 return random.getrandbits(1) 294 295 296def any_pp(): 297 return (random.getrandbits(2) << 6) 298 299 300def any_link_quality_3(): 301 return random.getrandbits(8) 302 303 304def any_link_quality_2(): 305 return random.getrandbits(8) 306 307 308def any_link_quality_1(): 309 return random.getrandbits(8) 310 311 312def any_leader_cost(): 313 return random.getrandbits(8) 314 315 316def any_id_sequence(): 317 return random.getrandbits(8) 318 319 320def any_active_routers(): 321 return random.getrandbits(8) 322 323 324def any_sed_buffer_size(): 325 return random.getrandbits(16) 326 327 328def any_sed_datagram_count(): 329 return random.getrandbits(8) 330 331 332def any_tlvs(length=None): 333 if length is None: 334 length = random.randint(0, 16) 335 336 return [random.getrandbits(8) for _ in range(length)] 337 338 339def any_cid(): 340 return random.getrandbits(4) 341 342 343def any_iid(): 344 return bytearray([random.getrandbits(8) for _ in range(8)]) 345 346 347def any_ipv6_address(): 348 return bytearray([random.getrandbits(8) for _ in range(16)]) 349 350 351def any_addresses(): 352 addresses = [mle.AddressCompressed(any_cid(), any_iid()), mle.AddressFull(any_ipv6_address())] 353 354 return addresses 355 356 357def any_key_id_mode(): 358 return random.getrandbits(2) 359 360 361def any_security_level(): 362 return random.getrandbits(3) 363 364 365def any_frame_counter(): 366 return random.getrandbits(32) 367 368 369def any_key_id(key_id_mode): 370 if key_id_mode == 0: 371 length = 0 372 elif key_id_mode == 1: 373 length = 1 374 elif key_id_mode == 2: 375 length = 5 376 elif key_id_mode == 3: 377 length = 9 378 379 return bytearray([random.getrandbits(8) for _ in range(length)]) 380 381 382def any_eui64(): 383 return bytearray([random.getrandbits(8) for _ in range(8)]) 384 385 386class TestSourceAddress(unittest.TestCase): 387 388 def test_should_return_address_value_when_address_property_is_called(self): 389 # GIVEN 390 address = any_address() 391 392 source_address = mle.SourceAddress(address) 393 394 # WHEN 395 actual_address = source_address.address 396 397 # THEN 398 self.assertEqual(address, actual_address) 399 400 401class TestSourceAddressFactory(unittest.TestCase): 402 403 def test_should_create_SourceAddress_from_bytearray_when_parse_method_is_called(self): 404 # GIVEN 405 address = any_address() 406 407 factory = mle.SourceAddressFactory() 408 409 data = struct.pack(">H", address) 410 411 # WHEN 412 actual_source_address = factory.parse(io.BytesIO(data), dict()) 413 414 # THEN 415 self.assertTrue(isinstance(actual_source_address, mle.SourceAddress)) 416 self.assertEqual(address, actual_source_address.address) 417 418 419class TestMode(unittest.TestCase): 420 421 def test_should_return_receiver_value_when_receiver_property_is_called(self): 422 # GIVEN 423 receiver = any_receiver() 424 425 mode = mle.Mode(receiver, any_secure(), any_device_type(), any_network_data()) 426 427 # WHEN 428 actual_receiver = mode.receiver 429 430 # THEN 431 self.assertEqual(receiver, actual_receiver) 432 433 def test_should_return_secure_value_when_secure_property_is_called(self): 434 # GIVEN 435 secure = any_secure() 436 437 mode = mle.Mode(any_receiver(), secure, any_device_type(), any_network_data()) 438 439 # WHEN 440 actual_secure = mode.secure 441 442 # THEN 443 self.assertEqual(secure, actual_secure) 444 445 def test_should_return_device_type_value_when_device_type_property_is_called(self): 446 # GIVEN 447 device_type = any_device_type() 448 449 mode = mle.Mode(any_receiver(), any_secure(), device_type, any_network_data()) 450 451 # WHEN 452 actual_device_type = mode.device_type 453 454 # THEN 455 self.assertEqual(device_type, actual_device_type) 456 457 def test_should_return_network_data_value_when_network_data_property_is_called(self): 458 # GIVEN 459 network_data = any_network_data() 460 461 mode = mle.Mode(any_receiver(), any_secure(), any_device_type(), network_data) 462 463 # WHEN 464 actual_network_data = mode.network_data 465 466 # THEN 467 self.assertEqual(network_data, actual_network_data) 468 469 470class TestModeFactory(unittest.TestCase): 471 472 def test_should_create_Mode_from_bytearray_when_parse_method_is_called(self): 473 # GIVEN 474 mode = any_mode() 475 476 factory = mle.ModeFactory() 477 478 data = bytearray([mode]) 479 480 # WHEN 481 actual_mode = factory.parse(io.BytesIO(data), dict()) 482 483 # THEN 484 self.assertTrue(isinstance(actual_mode, mle.Mode)) 485 self.assertEqual(mode_map[mode]["receiver"], actual_mode.receiver) 486 self.assertEqual(mode_map[mode]["secure"], actual_mode.secure) 487 self.assertEqual(mode_map[mode]["device_type"], actual_mode.device_type) 488 self.assertEqual(mode_map[mode]["network_data"], actual_mode.network_data) 489 490 491class TestTimeout(unittest.TestCase): 492 493 def test_should_return_timeout_value_when_timeout_property_is_called(self): 494 # GIVEN 495 timeout = any_timeout() 496 497 timeout_obj = mle.Timeout(timeout) 498 499 # WHEN 500 actual_timeout = timeout_obj.timeout 501 502 # THEN 503 self.assertEqual(timeout, actual_timeout) 504 505 506class TestTimeoutFactory(unittest.TestCase): 507 508 def test_should_create_Timeout_from_bytearray_when_parse_method_is_called(self): 509 # GIVEN 510 timeout = any_timeout() 511 512 factory = mle.TimeoutFactory() 513 514 data = struct.pack(">I", timeout) 515 516 # WHEN 517 actual_timeout = factory.parse(io.BytesIO(data), dict()) 518 519 # THEN 520 self.assertTrue(isinstance(actual_timeout, mle.Timeout)) 521 self.assertEqual(timeout, actual_timeout.timeout) 522 523 524class TestChallenge(unittest.TestCase): 525 526 def test_should_return_challenge_value_when_challenge_property_is_called(self): 527 # GIVEN 528 challenge = any_challenge() 529 530 challenge_obj = mle.Challenge(challenge) 531 532 # WHEN 533 actual_challenge = challenge_obj.challenge 534 535 # THEN 536 self.assertEqual(challenge, actual_challenge) 537 538 539class TestChallengeFactory(unittest.TestCase): 540 541 def test_should_create_Challenge_from_bytearray_when_parse_method_is_called(self): 542 # GIVEN 543 challenge = any_challenge() 544 545 factory = mle.ChallengeFactory() 546 547 data = challenge 548 549 # WHEN 550 actual_challenge = factory.parse(io.BytesIO(data), dict()) 551 552 # THEN 553 self.assertTrue(isinstance(actual_challenge, mle.Challenge)) 554 self.assertEqual(challenge, actual_challenge.challenge) 555 556 557class TestResponse(unittest.TestCase): 558 559 def test_should_return_response_value_when_response_property_is_called(self): 560 # GIVEN 561 response = any_response() 562 563 response_obj = mle.Response(response) 564 565 # WHEN 566 actual_response = response_obj.response 567 568 # THEN 569 self.assertEqual(response, actual_response) 570 571 572class TestResponseFactory(unittest.TestCase): 573 574 def test_should_create_Challenge_from_bytearray_when_parse_method_is_called(self): 575 # GIVEN 576 response = any_response() 577 578 factory = mle.ResponseFactory() 579 580 data = response 581 582 # WHEN 583 actual_response = factory.parse(io.BytesIO(data), dict()) 584 585 # THEN 586 self.assertTrue(isinstance(actual_response, mle.Response)) 587 self.assertEqual(response, actual_response.response) 588 589 590class TestLinkLayerFrameCounter(unittest.TestCase): 591 592 def test_should_return_frame_counter_value_when_frame_counter_property_is_called(self): 593 # GIVEN 594 link_layer_frame_counter = any_link_layer_frame_counter() 595 596 link_layer_frame_counter_obj = mle.LinkLayerFrameCounter(link_layer_frame_counter) 597 598 # WHEN 599 actual_link_layer_frame_counter = link_layer_frame_counter_obj.frame_counter 600 601 # THEN 602 self.assertEqual(link_layer_frame_counter, actual_link_layer_frame_counter) 603 604 605class TestLinkLayerFrameCounterFactory(unittest.TestCase): 606 607 def test_should_create_LinkLayerFrameCounter_from_bytearray_when_parse_method_is_called(self): 608 # GIVEN 609 link_layer_frame_counter = any_link_layer_frame_counter() 610 611 factory = mle.LinkLayerFrameCounterFactory() 612 613 data = struct.pack(">I", link_layer_frame_counter) 614 615 # WHEN 616 actual_link_layer_frame_counter = factory.parse(io.BytesIO(data), dict()) 617 618 # THEN 619 self.assertTrue(isinstance(actual_link_layer_frame_counter, mle.LinkLayerFrameCounter)) 620 self.assertEqual(link_layer_frame_counter, actual_link_layer_frame_counter.frame_counter) 621 622 623class TestMleFrameCounter(unittest.TestCase): 624 625 def test_should_return_frame_counter_value_when_frame_counter_property_is_called(self): 626 # GIVEN 627 mle_frame_counter = any_mle_frame_counter() 628 629 mle_frame_counter_obj = mle.MleFrameCounter(mle_frame_counter) 630 631 # WHEN 632 actual_mle_frame_counter = mle_frame_counter_obj.frame_counter 633 634 # THEN 635 self.assertEqual(mle_frame_counter, actual_mle_frame_counter) 636 637 638class TestMleFrameCounterFactory(unittest.TestCase): 639 640 def test_should_create_MleFrameCounter_from_bytearray_when_parse_method_is_called(self): 641 # GIVEN 642 mle_frame_counter = any_mle_frame_counter() 643 644 factory = mle.MleFrameCounterFactory() 645 646 data = struct.pack(">I", mle_frame_counter) 647 648 # WHEN 649 actual_mle_frame_counter = factory.parse(io.BytesIO(data), dict()) 650 651 # THEN 652 self.assertTrue(isinstance(actual_mle_frame_counter, mle.MleFrameCounter)) 653 self.assertEqual(mle_frame_counter, actual_mle_frame_counter.frame_counter) 654 655 656class TestLinkQualityAndRouteData(unittest.TestCase): 657 658 def test_should_return_output_value_when_output_property_is_called(self): 659 # GIVEN 660 output = any_output() 661 662 lqrd = mle.LinkQualityAndRouteData(output, any_input(), any_route()) 663 664 # WHEN 665 actual_output = lqrd.output 666 667 # THEN 668 self.assertEqual(output, actual_output) 669 670 def test_should_return_input_value_when_input_property_is_called(self): 671 # GIVEN 672 _input = any_input() 673 674 lqrd = mle.LinkQualityAndRouteData(any_output(), _input, any_route()) 675 676 # WHEN 677 actual_input = lqrd.input 678 679 # THEN 680 self.assertEqual(_input, actual_input) 681 682 def test_should_return_route_value_when_route_property_is_called(self): 683 # GIVEN 684 route = any_route() 685 686 lqrd = mle.LinkQualityAndRouteData(any_output(), any_input(), route) 687 688 # WHEN 689 actual_route = lqrd.route 690 691 # THEN 692 self.assertEqual(route, actual_route) 693 694 695class TestLinkQualityAndRouteDataFactory(unittest.TestCase): 696 697 def test_should_create_LinkQualityAndRouteData_from_well_known_byte_when_parse_method_is_called(self): 698 # GIVEN 699 factory = mle.LinkQualityAndRouteDataFactory() 700 701 data = bytearray([0x66]) 702 703 # WHEN 704 actual_lqrd = factory.parse(io.BytesIO(data), dict()) 705 706 # THEN 707 self.assertEqual(1, actual_lqrd.output) 708 self.assertEqual(2, actual_lqrd.input) 709 self.assertEqual(6, actual_lqrd.route) 710 711 def test_should_create_LinkQualityAndRouteData_from_bytearray_when_parse_method_is_called(self): 712 # GIVEN 713 output = any_output() 714 _input = any_input() 715 route = any_route() 716 717 lqrd = (output << 6) | (_input << 4) | route 718 719 factory = mle.LinkQualityAndRouteDataFactory() 720 721 data = bytearray([lqrd]) 722 723 # WHEN 724 actual_lqrd = factory.parse(io.BytesIO(data), dict()) 725 726 # THEN 727 self.assertTrue(isinstance(actual_lqrd, mle.LinkQualityAndRouteData)) 728 self.assertEqual(output, actual_lqrd.output) 729 self.assertEqual(_input, actual_lqrd.input) 730 self.assertEqual(route, actual_lqrd.route) 731 732 733class TestRoute64(unittest.TestCase): 734 735 def test_should_return_id_sequence_value_when_id_sequence_property_is_called(self): 736 # GIVEN 737 id_sequence = any_id_sequence() 738 739 route64_obj = mle.Route64(id_sequence, any_router_id_mask(), any_link_quality_and_route_data()) 740 741 # WHEN 742 actual_id_sequence = route64_obj.id_sequence 743 744 # THEN 745 self.assertEqual(id_sequence, actual_id_sequence) 746 747 def test_should_return_router_id_mask_value_when_router_id_mask_property_is_called(self): 748 # GIVEN 749 router_id_mask = any_router_id_mask() 750 751 route64_obj = mle.Route64(any_id_sequence(), router_id_mask, any_link_quality_and_route_data()) 752 753 # WHEN 754 actual_router_id_mask = route64_obj.router_id_mask 755 756 # THEN 757 self.assertEqual(router_id_mask, actual_router_id_mask) 758 759 def test_should_return_link_quality_and_route_data_value_when_link_quality_and_route_data_property_is_called(self): 760 # GIVEN 761 link_quality_and_route_data = any_link_quality_and_route_data() 762 763 route64_obj = mle.Route64(any_id_sequence(), any_router_id_mask(), link_quality_and_route_data) 764 765 # WHEN 766 actual_link_quality_and_route_data = route64_obj.link_quality_and_route_data 767 768 # THEN 769 self.assertEqual(link_quality_and_route_data, actual_link_quality_and_route_data) 770 771 772class TestRoute64Factory(unittest.TestCase): 773 774 def test_should_create_Route64_from_bytearray_when_parse_method_is_called(self): 775 # GIVEN 776 class DummyLQRDFactory: 777 778 def parse(self, data, context): 779 return ord(data.read(1)) 780 781 id_sequence = any_id_sequence() 782 router_id_mask = any_router_id_mask() 783 784 router_count = 0 785 for i in range(64): 786 router_count += (router_id_mask >> i) & 0x01 787 788 link_quality_and_route_data = any_link_quality_and_route_data(router_count) 789 790 factory = mle.Route64Factory(DummyLQRDFactory()) 791 792 data = bytearray([id_sequence]) + struct.pack(">Q", router_id_mask) + bytearray(link_quality_and_route_data) 793 794 # WHEN 795 actual_route64 = factory.parse(io.BytesIO(data), dict()) 796 797 # THEN 798 self.assertTrue(isinstance(actual_route64, mle.Route64)) 799 self.assertEqual(id_sequence, actual_route64.id_sequence) 800 self.assertEqual(router_id_mask, actual_route64.router_id_mask) 801 self.assertEqual([b for b in link_quality_and_route_data], actual_route64.link_quality_and_route_data) 802 803 804class TestAddress16(unittest.TestCase): 805 806 def test_should_return_address_value_when_address_property_is_called(self): 807 # GIVEN 808 address = any_address() 809 810 address16 = mle.Address16(address) 811 812 # WHEN 813 actual_address = address16.address 814 815 # THEN 816 self.assertEqual(address, actual_address) 817 818 819class TestAddress16Factory(unittest.TestCase): 820 821 def test_should_create_Address16_from_bytearray_when_parse_method_is_called(self): 822 # GIVEN 823 address = any_address() 824 825 factory = mle.Address16Factory() 826 827 data = struct.pack(">H", address) 828 829 # WHEN 830 actual_address16 = factory.parse(io.BytesIO(data), dict()) 831 832 # THEN 833 self.assertTrue(isinstance(actual_address16, mle.Address16)) 834 self.assertEqual(address, actual_address16.address) 835 836 837class TestLeaderData(unittest.TestCase): 838 839 def test_should_return_partition_id_value_when_partition_id_property_is_called(self): 840 # GIVEN 841 partition_id = any_partition_id() 842 843 leader_data = mle.LeaderData(partition_id, any_weighting(), any_data_version(), any_stable_data_version(), 844 any_leader_router_id()) 845 846 # WHEN 847 actual_partition_id = leader_data.partition_id 848 849 # THEN 850 self.assertEqual(partition_id, actual_partition_id) 851 852 def test_should_return_weighting_value_when_weighting_property_is_called(self): 853 # GIVEN 854 weighting = any_weighting() 855 856 leader_data = mle.LeaderData(any_partition_id(), weighting, any_data_version(), any_stable_data_version(), 857 any_leader_router_id()) 858 859 # WHEN 860 actual_weighting = leader_data.weighting 861 862 # THEN 863 self.assertEqual(weighting, actual_weighting) 864 865 def test_should_return_data_version_value_when_data_version_property_is_called(self): 866 # GIVEN 867 data_version = any_data_version() 868 869 leader_data = mle.LeaderData(any_partition_id(), any_weighting(), data_version, any_stable_data_version(), 870 any_leader_router_id()) 871 872 # WHEN 873 actual_data_version = leader_data.data_version 874 875 # THEN 876 self.assertEqual(data_version, actual_data_version) 877 878 def test_should_return_stable_data_version_value_when_stable_data_version_property_is_called(self): 879 # GIVEN 880 stable_data_version = any_stable_data_version() 881 882 leader_data = mle.LeaderData(any_partition_id(), any_weighting(), any_data_version(), stable_data_version, 883 any_leader_router_id()) 884 885 # WHEN 886 actual_stable_data_version = leader_data.stable_data_version 887 888 # THEN 889 self.assertEqual(stable_data_version, actual_stable_data_version) 890 891 def test_should_return_leader_router_id_value_when_leader_router_id_property_is_called(self): 892 # GIVEN 893 leader_router_id = any_leader_router_id() 894 895 leader_data = mle.LeaderData(any_partition_id(), any_weighting(), any_data_version(), 896 any_stable_data_version(), leader_router_id) 897 898 # WHEN 899 actual_leader_router_id = leader_data.leader_router_id 900 901 # THEN 902 self.assertEqual(leader_router_id, actual_leader_router_id) 903 904 905class TestLeaderDataFactory(unittest.TestCase): 906 907 def test_should_create_Address16_from_bytearray_when_parse_method_is_called(self): 908 # GIVEN 909 partition_id = any_partition_id() 910 weighting = any_weighting() 911 data_version = any_data_version() 912 stable_data_version = any_stable_data_version() 913 leader_router_id = any_leader_router_id() 914 915 factory = mle.LeaderDataFactory() 916 917 data = bytearray(struct.pack(">I", partition_id)) + \ 918 bytearray([weighting, data_version, stable_data_version, leader_router_id]) 919 920 # WHEN 921 actual_leader_data = factory.parse(io.BytesIO(data), dict()) 922 923 # THEN 924 self.assertTrue(isinstance(actual_leader_data, mle.LeaderData)) 925 self.assertEqual(partition_id, actual_leader_data.partition_id) 926 self.assertEqual(weighting, actual_leader_data.weighting) 927 self.assertEqual(data_version, actual_leader_data.data_version) 928 self.assertEqual(stable_data_version, actual_leader_data.stable_data_version) 929 self.assertEqual(leader_router_id, actual_leader_data.leader_router_id) 930 931 932class TestNetworkData(unittest.TestCase): 933 934 def test_should_return_tlvs_value_when_tlvs_property_is_called(self): 935 # GIVEN 936 tlvs = any_tlvs() 937 938 network_data = mle.NetworkData(tlvs) 939 940 # WHEN 941 actual_tlvs = network_data.tlvs 942 943 # THEN 944 self.assertEqual(tlvs, actual_tlvs) 945 946 947class TestNetworkDataFactory(unittest.TestCase): 948 949 def test_should_create_TlvRequest_from_bytearray_when_parse_method_is_called(self): 950 # GIVEN 951 class DummyNetworkTlvsFactory: 952 953 def parse(self, data, context): 954 return [b for b in bytearray(data.read())] 955 956 tlvs = any_tlvs() 957 958 factory = mle.NetworkDataFactory(DummyNetworkTlvsFactory()) 959 960 data = bytearray(tlvs) 961 962 # WHEN 963 actual_network_data = factory.parse(io.BytesIO(data), dict()) 964 965 # THEN 966 self.assertTrue(isinstance(actual_network_data, mle.NetworkData)) 967 self.assertEqual(tlvs, actual_network_data.tlvs) 968 969 970class TestTlvRequest(unittest.TestCase): 971 972 def test_should_return_tlvs_value_when_tlvs_property_is_called(self): 973 # GIVEN 974 tlvs = any_tlvs() 975 976 tlv_request = mle.TlvRequest(tlvs) 977 978 # WHEN 979 actual_tlvs = tlv_request.tlvs 980 981 # THEN 982 self.assertEqual(tlvs, actual_tlvs) 983 984 985class TestTlvRequestFactory(unittest.TestCase): 986 987 def test_should_create_TlvRequest_from_bytearray_when_parse_method_is_called(self): 988 # GIVEN 989 tlvs = any_tlvs() 990 991 factory = mle.TlvRequestFactory() 992 993 data = bytearray(tlvs) 994 995 # WHEN 996 actual_tlv_request = factory.parse(io.BytesIO(data), dict()) 997 998 # THEN 999 self.assertTrue(isinstance(actual_tlv_request, mle.TlvRequest)) 1000 self.assertEqual(tlvs, actual_tlv_request.tlvs) 1001 1002 1003class TestScanMask(unittest.TestCase): 1004 1005 def test_should_return_router_value_when_router_property_is_called(self): 1006 # GIVEN 1007 router = any_scan_mask_router() 1008 1009 scan_mask = mle.ScanMask(router, any_scan_mask_end_device()) 1010 1011 # WHEN 1012 actual_router = scan_mask.router 1013 1014 # THEN 1015 self.assertEqual(router, actual_router) 1016 1017 def test_should_return_end_device_value_when_end_device_property_is_called(self): 1018 # GIVEN 1019 end_device = any_scan_mask_end_device() 1020 1021 scan_mask = mle.ScanMask(any_scan_mask_router(), end_device) 1022 1023 # WHEN 1024 actual_end_device = scan_mask.end_device 1025 1026 # THEN 1027 self.assertEqual(end_device, actual_end_device) 1028 1029 1030class TestScanMaskFactory(unittest.TestCase): 1031 1032 def test_should_create_ScanMask_from_bytearray_when_parse_method_is_called(self): 1033 # GIVEN 1034 scan_mask = any_scan_mask() 1035 1036 factory = mle.ScanMaskFactory() 1037 1038 data = bytearray([scan_mask]) 1039 1040 # WHEN 1041 actual_scan_mask = factory.parse(io.BytesIO(data), dict()) 1042 1043 # THEN 1044 self.assertTrue(isinstance(actual_scan_mask, mle.ScanMask)) 1045 self.assertEqual(scan_mask_map[scan_mask]["router"], actual_scan_mask.router) 1046 self.assertEqual(scan_mask_map[scan_mask]["end_device"], actual_scan_mask.end_device) 1047 1048 1049class TestConnectivity(unittest.TestCase): 1050 1051 def test_should_return_pp_value_when_pp_property_is_called(self): 1052 # GIVEN 1053 pp_byte = any_pp() 1054 1055 connectivity_obj = mle.Connectivity(pp_byte, any_link_quality_3(), any_link_quality_2(), any_link_quality_1(), 1056 any_leader_cost(), any_id_sequence(), any_active_routers(), 1057 any_sed_buffer_size(), any_sed_datagram_count()) 1058 1059 # WHEN 1060 actual_pp = connectivity_obj.pp 1061 1062 # THEN 1063 self.assertEqual(common.map_pp(pp_byte), actual_pp) 1064 1065 def test_should_return_link_quality_3_value_when_link_quality_3_property_is_called(self): 1066 # GIVEN 1067 link_quality_3 = any_link_quality_3() 1068 1069 connectivity_obj = mle.Connectivity(any_pp(), link_quality_3, any_link_quality_2(), any_link_quality_1(), 1070 any_leader_cost(), any_id_sequence(), any_active_routers(), 1071 any_sed_buffer_size(), any_sed_datagram_count()) 1072 1073 # WHEN 1074 actual_link_quality_3 = connectivity_obj.link_quality_3 1075 1076 # THEN 1077 self.assertEqual(link_quality_3, actual_link_quality_3) 1078 1079 def test_should_return_link_quality_2_value_when_link_quality_2_property_is_called(self): 1080 # GIVEN 1081 link_quality_2 = any_link_quality_2() 1082 1083 connectivity_obj = mle.Connectivity(any_pp(), any_link_quality_3(), link_quality_2, any_link_quality_1(), 1084 any_leader_cost(), any_id_sequence(), any_active_routers(), 1085 any_sed_buffer_size(), any_sed_datagram_count()) 1086 1087 # WHEN 1088 actual_link_quality_2 = connectivity_obj.link_quality_2 1089 1090 # THEN 1091 self.assertEqual(link_quality_2, actual_link_quality_2) 1092 1093 def test_should_return_link_quality_1_value_when_link_quality_1_property_is_called(self): 1094 # GIVEN 1095 link_quality_1 = any_link_quality_1() 1096 1097 connectivity_obj = mle.Connectivity(any_pp(), any_link_quality_3(), any_link_quality_2(), link_quality_1, 1098 any_leader_cost(), any_id_sequence(), any_active_routers(), 1099 any_sed_buffer_size(), any_sed_datagram_count()) 1100 1101 # WHEN 1102 actual_link_quality_1 = connectivity_obj.link_quality_1 1103 1104 # THEN 1105 self.assertEqual(link_quality_1, actual_link_quality_1) 1106 1107 def test_should_return_leader_cost_value_when_leader_cost_property_is_called(self): 1108 # GIVEN 1109 leader_cost = any_leader_cost() 1110 1111 connectivity_obj = mle.Connectivity(any_pp(), any_link_quality_3(), any_link_quality_2(), 1112 any_link_quality_1(), leader_cost, any_id_sequence(), any_active_routers(), 1113 any_sed_buffer_size(), any_sed_datagram_count()) 1114 1115 # WHEN 1116 actual_leader_cost = connectivity_obj.leader_cost 1117 1118 # THEN 1119 self.assertEqual(leader_cost, actual_leader_cost) 1120 1121 def test_should_return_id_sequence_value_when_id_sequence_property_is_called(self): 1122 # GIVEN 1123 id_sequence = any_id_sequence() 1124 1125 connectivity_obj = mle.Connectivity(any_pp(), any_link_quality_3(), any_link_quality_2(), any_link_quality_1(), 1126 any_leader_cost(), id_sequence, any_active_routers(), 1127 any_sed_buffer_size(), any_sed_datagram_count()) 1128 1129 # WHEN 1130 actual_id_sequence = connectivity_obj.id_sequence 1131 1132 # THEN 1133 self.assertEqual(id_sequence, actual_id_sequence) 1134 1135 def test_should_return_active_routers_value_when_active_routers_property_is_called(self): 1136 # GIVEN 1137 active_routers = any_active_routers() 1138 1139 connectivity_obj = mle.Connectivity(any_pp(), any_link_quality_3(), any_link_quality_2(), any_link_quality_1(), 1140 any_leader_cost(), any_id_sequence(), active_routers, 1141 any_sed_buffer_size(), any_sed_datagram_count()) 1142 1143 # WHEN 1144 actual_active_routers = connectivity_obj.active_routers 1145 1146 # THEN 1147 self.assertEqual(active_routers, actual_active_routers) 1148 1149 def test_should_return_sed_buffer_size_value_when_sed_buffer_size_property_is_called(self): 1150 # GIVEN 1151 sed_buffer_size = any_sed_buffer_size() 1152 1153 connectivity_obj = mle.Connectivity(any_pp(), any_link_quality_3(), any_link_quality_2(), any_link_quality_1(), 1154 any_leader_cost(), any_id_sequence(), any_active_routers(), 1155 sed_buffer_size, any_sed_datagram_count()) 1156 1157 # WHEN 1158 actual_sed_buffer_size = connectivity_obj.sed_buffer_size 1159 1160 # THEN 1161 self.assertEqual(sed_buffer_size, actual_sed_buffer_size) 1162 1163 def test_should_return_sed_datagram_count_value_when_sed_datagram_count_property_is_called(self): 1164 # GIVEN 1165 sed_datagram_count = any_sed_datagram_count() 1166 1167 connectivity_obj = mle.Connectivity(any_pp(), any_link_quality_3(), any_link_quality_2(), any_link_quality_1(), 1168 any_leader_cost(), any_id_sequence(), any_active_routers(), 1169 any_sed_buffer_size(), sed_datagram_count) 1170 1171 # WHEN 1172 actual_sed_datagram_count = connectivity_obj.sed_datagram_count 1173 1174 # THEN 1175 self.assertEqual(sed_datagram_count, actual_sed_datagram_count) 1176 1177 1178class TestConnectivityFactory(unittest.TestCase): 1179 1180 def test_should_create_Connectivity_from_bytearray_when_parse_method_is_called(self): 1181 # GIVEN 1182 pp_byte = any_pp() 1183 link_quality_3 = any_link_quality_3() 1184 link_quality_2 = any_link_quality_2() 1185 link_quality_1 = any_link_quality_1() 1186 leader_cost = any_leader_cost() 1187 id_sequence = any_id_sequence() 1188 active_routers = any_active_routers() 1189 sed_buffer_size = any_sed_buffer_size() 1190 sed_datagram_count = any_sed_datagram_count() 1191 1192 factory = mle.ConnectivityFactory() 1193 1194 data = bytearray([ 1195 pp_byte, link_quality_3, link_quality_2, link_quality_1, leader_cost, id_sequence, active_routers 1196 ]) + struct.pack(">H", sed_buffer_size) + bytearray([sed_datagram_count]) 1197 1198 # WHEN 1199 actual_connectivity = factory.parse(io.BytesIO(data), dict()) 1200 1201 # THEN 1202 self.assertTrue(isinstance(actual_connectivity, mle.Connectivity)) 1203 self.assertEqual(common.map_pp(pp_byte), actual_connectivity.pp) 1204 self.assertEqual(link_quality_3, actual_connectivity.link_quality_3) 1205 self.assertEqual(link_quality_2, actual_connectivity.link_quality_2) 1206 self.assertEqual(link_quality_1, actual_connectivity.link_quality_1) 1207 self.assertEqual(leader_cost, actual_connectivity.leader_cost) 1208 self.assertEqual(id_sequence, actual_connectivity.id_sequence) 1209 self.assertEqual(active_routers, actual_connectivity.active_routers) 1210 self.assertEqual(sed_buffer_size, actual_connectivity.sed_buffer_size) 1211 self.assertEqual(sed_datagram_count, actual_connectivity.sed_datagram_count) 1212 1213 def test_should_create_Connectivity_without_sed_data_when_parse_method_is_called(self): 1214 # GIVEN 1215 pp_byte = any_pp() 1216 link_quality_3 = any_link_quality_3() 1217 link_quality_2 = any_link_quality_2() 1218 link_quality_1 = any_link_quality_1() 1219 leader_cost = any_leader_cost() 1220 id_sequence = any_id_sequence() 1221 active_routers = any_active_routers() 1222 any_sed_buffer_size() 1223 any_sed_datagram_count() 1224 1225 factory = mle.ConnectivityFactory() 1226 1227 data = bytearray( 1228 [pp_byte, link_quality_3, link_quality_2, link_quality_1, leader_cost, id_sequence, active_routers]) 1229 1230 # WHEN 1231 actual_connectivity = factory.parse(io.BytesIO(data), dict()) 1232 1233 # THEN 1234 self.assertTrue(isinstance(actual_connectivity, mle.Connectivity)) 1235 self.assertEqual(common.map_pp(pp_byte), actual_connectivity.pp) 1236 self.assertEqual(link_quality_3, actual_connectivity.link_quality_3) 1237 self.assertEqual(link_quality_2, actual_connectivity.link_quality_2) 1238 self.assertEqual(link_quality_1, actual_connectivity.link_quality_1) 1239 self.assertEqual(leader_cost, actual_connectivity.leader_cost) 1240 self.assertEqual(id_sequence, actual_connectivity.id_sequence) 1241 self.assertEqual(active_routers, actual_connectivity.active_routers) 1242 self.assertEqual(None, actual_connectivity.sed_buffer_size) 1243 self.assertEqual(None, actual_connectivity.sed_datagram_count) 1244 1245 1246class TestLinkMargin(unittest.TestCase): 1247 1248 def test_should_return_link_margin_value_when_link_margin_property_is_called(self): 1249 # GIVEN 1250 link_margin = any_link_margin() 1251 1252 link_margin_obj = mle.LinkMargin(link_margin) 1253 1254 # WHEN 1255 actual_link_margin = link_margin_obj.link_margin 1256 1257 # THEN 1258 self.assertEqual(link_margin, actual_link_margin) 1259 1260 1261class TestLinkMarginFactory(unittest.TestCase): 1262 1263 def test_should_create_LinkMargin_from_bytearray_when_parse_method_is_called(self): 1264 # GIVEN 1265 link_margin = any_link_margin() 1266 1267 factory = mle.LinkMarginFactory() 1268 1269 data = bytearray([link_margin]) 1270 1271 # WHEN 1272 actual_link_margin = factory.parse(io.BytesIO(data), dict()) 1273 1274 # THEN 1275 self.assertTrue(isinstance(actual_link_margin, mle.LinkMargin)) 1276 self.assertEqual(link_margin, actual_link_margin.link_margin) 1277 1278 1279class TestStatus(unittest.TestCase): 1280 1281 def test_should_return_status_value_when_status_property_is_called(self): 1282 # GIVEN 1283 status = any_status() 1284 1285 status_obj = mle.Status(status) 1286 1287 # WHEN 1288 actual_status = status_obj.status 1289 1290 # THEN 1291 self.assertEqual(status, actual_status) 1292 1293 1294class TestStatusFactory(unittest.TestCase): 1295 1296 def test_should_create_Status_from_bytearray_when_parse_method_is_called(self): 1297 # GIVEN 1298 status = any_status() 1299 1300 factory = mle.StatusFactory() 1301 1302 data = bytearray([status]) 1303 1304 # WHEN 1305 actual_status = factory.parse(io.BytesIO(data), dict()) 1306 1307 # THEN 1308 self.assertTrue(isinstance(actual_status, mle.Status)) 1309 self.assertEqual(status, actual_status.status) 1310 1311 1312class TestVersion(unittest.TestCase): 1313 1314 def test_should_return_version_value_when_version_property_is_called(self): 1315 # GIVEN 1316 version = any_version() 1317 1318 version_obj = mle.Version(version) 1319 1320 # WHEN 1321 actual_version = version_obj.version 1322 1323 # THEN 1324 self.assertEqual(version, actual_version) 1325 1326 1327class TestVersionFactory(unittest.TestCase): 1328 1329 def test_should_create_Version_from_bytearray_when_parse_method_is_called(self): 1330 # GIVEN 1331 version = any_version() 1332 1333 factory = mle.VersionFactory() 1334 1335 data = struct.pack(">H", version) 1336 1337 # WHEN 1338 actual_version = factory.parse(io.BytesIO(data), dict()) 1339 1340 # THEN 1341 self.assertTrue(isinstance(actual_version, mle.Version)) 1342 self.assertEqual(version, actual_version.version) 1343 1344 1345class TestAddressRegistrationFull(unittest.TestCase): 1346 1347 def test_should_return_ipv6_address_value_when_ipv6_address_property_is_called(self): 1348 # GIVEN 1349 ipv6_address = any_ipv6_address() 1350 1351 addr_reg_full_obj = mle.AddressFull(ipv6_address) 1352 1353 # WHEN 1354 actual_ipv6_address = addr_reg_full_obj.ipv6_address 1355 1356 # THEN 1357 self.assertEqual(ipv6_address, actual_ipv6_address) 1358 1359 1360class TestAddressRegistrationFullFactory(unittest.TestCase): 1361 1362 def test_should_create_AddressFull_from_bytearray_when_parse_method_is_called(self): 1363 # GIVEN 1364 ipv6_address = any_ipv6_address() 1365 1366 factory = mle.AddressFullFactory() 1367 1368 data = bytearray([0x00]) + bytearray(ipv6_address) 1369 1370 # WHEN 1371 actual_addr_reg_full = factory.parse(io.BytesIO(data), dict()) 1372 1373 # THEN 1374 self.assertTrue(isinstance(actual_addr_reg_full, mle.AddressFull)) 1375 self.assertEqual(ipv6_address, actual_addr_reg_full.ipv6_address) 1376 1377 1378class TestAddressRegistrationCompressed(unittest.TestCase): 1379 1380 def test_should_return_cid_value_when_cid_property_is_called(self): 1381 # GIVEN 1382 cid = any_cid() 1383 1384 addr_reg_compressed_obj = mle.AddressCompressed(cid, any_iid()) 1385 1386 # WHEN 1387 actual_cid = addr_reg_compressed_obj.cid 1388 1389 # THEN 1390 self.assertEqual(cid, actual_cid) 1391 1392 def test_should_return_cid_value_when_iid_property_is_called(self): 1393 # GIVEN 1394 iid = any_iid() 1395 1396 addr_reg_compressed_obj = mle.AddressCompressed(any_cid(), iid) 1397 1398 # WHEN 1399 actual_iid = addr_reg_compressed_obj.iid 1400 1401 # THEN 1402 self.assertEqual(iid, actual_iid) 1403 1404 1405class TestAddressRegistrationCompressedFactory(unittest.TestCase): 1406 1407 def test_should_create_AddressRegistrationCompressed_from_bytearray_when_parse_method_is_called(self): 1408 # GIVEN 1409 cid = any_cid() 1410 iid = any_iid() 1411 1412 factory = mle.AddressCompressedFactory() 1413 1414 data = bytearray([(1 << 7) | cid]) + iid 1415 1416 # WHEN 1417 actual_addr_reg_compressed = factory.parse(io.BytesIO(data), dict()) 1418 1419 # THEN 1420 self.assertTrue(isinstance(actual_addr_reg_compressed, mle.AddressCompressed)) 1421 self.assertEqual(cid, actual_addr_reg_compressed.cid) 1422 self.assertEqual(iid, actual_addr_reg_compressed.iid) 1423 1424 1425class TestAddressRegistration(unittest.TestCase): 1426 1427 def test_should_return_addresses_value_when_addresses_property_is_called(self): 1428 # GIVEN 1429 addresses = any_addresses() 1430 1431 addr_reg_obj = mle.AddressRegistration(addresses) 1432 1433 # WHEN 1434 actual_addresses = addr_reg_obj.addresses 1435 1436 # THEN 1437 self.assertEqual(addresses, actual_addresses) 1438 1439 1440class TestAddressRegistrationFactory(unittest.TestCase): 1441 1442 def test_should_create_AddressRegistration_from_bytearray_when_parse_method_is_called(self): 1443 # GIVEN 1444 cid = any_cid() 1445 iid = any_iid() 1446 ipv6_address = any_ipv6_address() 1447 1448 addresses = [mle.AddressCompressed(cid, iid), mle.AddressFull(ipv6_address)] 1449 1450 factory = mle.AddressRegistrationFactory(mle.AddressCompressedFactory(), mle.AddressFullFactory()) 1451 1452 data = bytearray([(1 << 7) | cid]) + iid + bytearray([0]) + ipv6_address 1453 1454 # WHEN 1455 actual_addr_reg = factory.parse(io.BytesIO(data), dict()) 1456 1457 # THEN 1458 self.assertTrue(isinstance(actual_addr_reg, mle.AddressRegistration)) 1459 self.assertEqual(addresses[0].cid, actual_addr_reg.addresses[0].cid) 1460 self.assertEqual(addresses[0].iid, actual_addr_reg.addresses[0].iid) 1461 self.assertEqual(addresses[1].ipv6_address, actual_addr_reg.addresses[1].ipv6_address) 1462 1463 1464class TestChannel(unittest.TestCase): 1465 1466 def test_should_return_channel_page_value_when_channel_page_property_is_called(self): 1467 # GIVEN 1468 channel_page = any_channel_page() 1469 1470 channel_obj = mle.Channel(channel_page, any_channel()) 1471 1472 # WHEN 1473 actual_channel_page = channel_obj.channel_page 1474 1475 # THEN 1476 self.assertEqual(channel_page, actual_channel_page) 1477 1478 def test_should_return_channel_value_when_channel_property_is_called(self): 1479 # GIVEN 1480 channel = any_channel() 1481 1482 channel_obj = mle.Channel(any_channel_page(), channel) 1483 1484 # WHEN 1485 actual_channel = channel_obj.channel 1486 1487 # THEN 1488 self.assertEqual(channel, actual_channel) 1489 1490 1491class TestChannelFactory(unittest.TestCase): 1492 1493 def test_should_create_Channel_from_bytearray_when_parse_method_is_called(self): 1494 # GIVEN 1495 channel_page = any_channel_page() 1496 channel = any_channel() 1497 1498 factory = mle.ChannelFactory() 1499 1500 data = bytearray([channel_page]) + struct.pack(">H", channel) 1501 1502 # WHEN 1503 actual_channel = factory.parse(io.BytesIO(data), dict()) 1504 1505 # THEN 1506 self.assertTrue(isinstance(actual_channel, mle.Channel)) 1507 self.assertEqual(channel_page, actual_channel.channel_page) 1508 self.assertEqual(channel, actual_channel.channel) 1509 1510 1511class TestPanId(unittest.TestCase): 1512 1513 def test_should_return_pan_id_value_when_pan_id_property_is_called(self): 1514 # GIVEN 1515 pan_id = any_pan_id() 1516 1517 pan_id_obj = mle.PanId(pan_id) 1518 1519 # WHEN 1520 actual_pan_id = pan_id_obj.pan_id 1521 1522 # THEN 1523 self.assertEqual(pan_id, actual_pan_id) 1524 1525 1526class TestPanIdFactory(unittest.TestCase): 1527 1528 def test_should_create_PanId_from_bytearray_when_parse_method_is_called(self): 1529 # GIVEN 1530 pan_id = any_pan_id() 1531 1532 factory = mle.PanIdFactory() 1533 1534 data = struct.pack(">H", pan_id) 1535 1536 # WHEN 1537 actual_pan_id = factory.parse(io.BytesIO(data), dict()) 1538 1539 # THEN 1540 self.assertTrue(isinstance(actual_pan_id, mle.PanId)) 1541 self.assertEqual(pan_id, actual_pan_id.pan_id) 1542 1543 1544class TestActiveTimestamp(unittest.TestCase): 1545 1546 def test_should_return_timestamp_seconds_value_when_timestamp_seconds_property_is_called(self): 1547 # GIVEN 1548 timestamp_seconds = any_timestamp_seconds() 1549 1550 active_timestamp_obj = mle.ActiveTimestamp(timestamp_seconds, any_timestamp_ticks(), any_u()) 1551 1552 # WHEN 1553 actual_timestamp_seconds = active_timestamp_obj.timestamp_seconds 1554 1555 # THEN 1556 self.assertEqual(timestamp_seconds, actual_timestamp_seconds) 1557 1558 def test_should_return_timestamp_ticks_value_when_timestamp_ticks_property_is_called(self): 1559 # GIVEN 1560 timestamp_ticks = any_timestamp_ticks() 1561 1562 active_timestamp_obj = mle.ActiveTimestamp(any_timestamp_seconds(), timestamp_ticks, any_u()) 1563 1564 # WHEN 1565 actual_timestamp_ticks = active_timestamp_obj.timestamp_ticks 1566 1567 # THEN 1568 self.assertEqual(timestamp_ticks, actual_timestamp_ticks) 1569 1570 def test_should_return_u_value_when_u_property_is_called(self): 1571 # GIVEN 1572 u = any_u() 1573 1574 active_timestamp_obj = mle.ActiveTimestamp(any_timestamp_seconds(), any_timestamp_ticks(), u) 1575 1576 # WHEN 1577 actual_u = active_timestamp_obj.u 1578 1579 # THEN 1580 self.assertEqual(u, actual_u) 1581 1582 1583class TestActiveTimestampFactory(unittest.TestCase): 1584 1585 def test_should_create_ActiveTimestamp_from_bytearray_when_parse_method_is_called(self): 1586 # GIVEN 1587 timestamp_seconds = any_timestamp_seconds() 1588 timestamp_ticks = any_timestamp_ticks() 1589 u = any_u() 1590 1591 factory = mle.ActiveTimestampFactory() 1592 1593 data = struct.pack(">Q", timestamp_seconds)[2:] + struct.pack(">H", (timestamp_ticks << 1) | u) 1594 1595 # WHEN 1596 active_timestamp = factory.parse(io.BytesIO(data), dict()) 1597 1598 # THEN 1599 self.assertTrue(isinstance(active_timestamp, mle.ActiveTimestamp)) 1600 self.assertEqual(timestamp_seconds, active_timestamp.timestamp_seconds) 1601 self.assertEqual(timestamp_ticks, active_timestamp.timestamp_ticks) 1602 self.assertEqual(u, active_timestamp.u) 1603 1604 1605class TestPendingTimestamp(unittest.TestCase): 1606 1607 def test_should_return_timestamp_seconds_value_when_timestamp_seconds_property_is_called(self): 1608 # GIVEN 1609 timestamp_seconds = any_timestamp_seconds() 1610 1611 pending_timestamp_obj = mle.PendingTimestamp(timestamp_seconds, any_timestamp_ticks(), any_u()) 1612 1613 # WHEN 1614 actual_timestamp_seconds = pending_timestamp_obj.timestamp_seconds 1615 1616 # THEN 1617 self.assertEqual(timestamp_seconds, actual_timestamp_seconds) 1618 1619 def test_should_return_timestamp_ticks_value_when_timestamp_ticks_property_is_called(self): 1620 # GIVEN 1621 timestamp_ticks = any_timestamp_ticks() 1622 1623 pending_timestamp_obj = mle.PendingTimestamp(any_timestamp_seconds(), timestamp_ticks, any_u()) 1624 1625 # WHEN 1626 actual_timestamp_ticks = pending_timestamp_obj.timestamp_ticks 1627 1628 # THEN 1629 self.assertEqual(timestamp_ticks, actual_timestamp_ticks) 1630 1631 def test_should_return_u_value_when_u_property_is_called(self): 1632 # GIVEN 1633 u = any_u() 1634 1635 pending_timestamp_obj = mle.PendingTimestamp(any_timestamp_seconds(), any_timestamp_ticks(), u) 1636 1637 # WHEN 1638 actual_u = pending_timestamp_obj.u 1639 1640 # THEN 1641 self.assertEqual(u, actual_u) 1642 1643 1644class TestPendingTimestampFactory(unittest.TestCase): 1645 1646 def test_should_create_PendingTimestamp_from_bytearray_when_parse_method_is_called(self): 1647 # GIVEN 1648 timestamp_seconds = any_timestamp_seconds() 1649 timestamp_ticks = any_timestamp_ticks() 1650 u = any_u() 1651 1652 factory = mle.PendingTimestampFactory() 1653 1654 data = struct.pack(">Q", timestamp_seconds)[2:] + struct.pack(">H", (timestamp_ticks << 1) | u) 1655 1656 # WHEN 1657 pending_timestamp = factory.parse(io.BytesIO(data), dict()) 1658 1659 # THEN 1660 self.assertTrue(isinstance(pending_timestamp, mle.PendingTimestamp)) 1661 self.assertEqual(timestamp_seconds, pending_timestamp.timestamp_seconds) 1662 self.assertEqual(timestamp_ticks, pending_timestamp.timestamp_ticks) 1663 self.assertEqual(u, pending_timestamp.u) 1664 1665 1666class TestMleCommandFactory(unittest.TestCase): 1667 1668 def test_should_create_MleCommand_from_bytearray_when_parse_method_is_called(self): 1669 data = bytearray([ 1670 0x0b, 0x04, 0x08, 0xa5, 0xf2, 0x9b, 0xde, 0xe3, 0xd8, 0xbe, 0xb9, 0x05, 0x04, 0x00, 0x00, 0x00, 0x00, 0x08, 1671 0x04, 0x00, 0x00, 0x00, 0x01, 0x01, 0x01, 0x0d, 0x02, 0x04, 0x00, 0x00, 0x00, 0xf0, 0x12, 0x02, 0x00, 0x02, 1672 0x13, 0x09, 0x80, 0x86, 0xa2, 0x1b, 0x81, 0x6d, 0xb8, 0xb5, 0xe8, 0x0d, 0x03, 0x0a, 0x0c, 0x09 1673 ]) 1674 1675 factory = mle.MleCommandFactory(config.create_default_mle_tlvs_factories()) 1676 1677 # WHEN 1678 actual_mle_command = factory.parse(io.BytesIO(data), None) 1679 1680 # THEN 1681 self.assertTrue(isinstance(actual_mle_command, mle.MleCommand)) 1682 1683 self.assertEqual(11, actual_mle_command.type) 1684 1685 self.assertEqual(mle.Response(bytearray([0xa5, 0xf2, 0x9b, 0xde, 0xe3, 0xd8, 0xbe, 0xb9])), 1686 actual_mle_command.tlvs[0]) 1687 1688 self.assertEqual(mle.LinkLayerFrameCounter(0), actual_mle_command.tlvs[1]) 1689 1690 self.assertEqual(mle.MleFrameCounter(1), actual_mle_command.tlvs[2]) 1691 1692 self.assertEqual(mle.Mode(receiver=1, secure=1, device_type=0, network_data=1), actual_mle_command.tlvs[3]) 1693 1694 self.assertEqual(mle.Timeout(240), actual_mle_command.tlvs[4]) 1695 1696 self.assertEqual(mle.Version(2), actual_mle_command.tlvs[5]) 1697 1698 self.assertEqual( 1699 mle.AddressRegistration(addresses=[ 1700 mle.AddressCompressed(cid=0, iid=bytearray([0x86, 0xa2, 0x1b, 0x81, 0x6d, 0xb8, 0xb5, 0xe8])) 1701 ]), actual_mle_command.tlvs[6]) 1702 1703 self.assertEqual(mle.TlvRequest(tlvs=[10, 12, 9]), actual_mle_command.tlvs[7]) 1704 1705 1706class TestMleMessageFactory(unittest.TestCase): 1707 1708 def test_should_create_MleMessageSecured_from_bytearray_when_parse_method_is_called(self): 1709 # GIVEN 1710 message_info = common.MessageInfo() 1711 message_info.source_ipv6 = "fe80::10cf:d38b:3b61:5558" 1712 message_info.destination_ipv6 = "fe80::383e:9eed:7a01:36a5" 1713 1714 message_info.source_mac_address = common.MacAddress.from_eui64( 1715 bytearray([0x12, 0xcf, 0xd3, 0x8b, 0x3b, 0x61, 0x55, 0x58])) 1716 message_info.destination_mac_address = common.MacAddress.from_eui64( 1717 bytearray([0x3a, 0x3e, 0x9e, 0xed, 0x7a, 0x01, 0x36, 0xa5])) 1718 1719 data = bytearray([ 1720 0x00, 0x15, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x14, 0x03, 0xe3, 0x72, 0x50, 0x4f, 0x8c, 1721 0x5c, 0x42, 0x81, 0x68, 0xe2, 0x11, 0xfc, 0xf5, 0x8c, 0x62, 0x8e, 0x83, 0x99, 0xe7, 0x26, 0x86, 0x34, 0x3b, 1722 0xa7, 0x68, 0xc7, 0x93, 0xfb, 0x72, 0xd9, 0xcc, 0x13, 0x5e, 0x5b, 0x96, 0x0e, 0xf1, 0x80, 0x03, 0x55, 0x4f, 1723 0x27, 0xc2, 0x96, 0xf4, 0x9c, 0x65, 0x82, 0x97, 0xcf, 0x97, 0x35, 0x89, 0xc2 1724 ]) 1725 1726 factory = config.create_default_mle_message_factory(network_key=config.DEFAULT_NETWORK_KEY) 1727 1728 # WHEN 1729 actual_mle_message = factory.parse(io.BytesIO(data), message_info) 1730 1731 # THEN 1732 self.assertTrue(isinstance(actual_mle_message, mle.MleMessageSecured)) 1733 1734 self.assertEqual(11, actual_mle_message.command.type) 1735 1736 self.assertEqual(mle.Response(bytearray([0xa5, 0xf2, 0x9b, 0xde, 0xe3, 0xd8, 0xbe, 0xb9])), 1737 actual_mle_message.command.tlvs[0]) 1738 1739 self.assertEqual(mle.LinkLayerFrameCounter(0), actual_mle_message.command.tlvs[1]) 1740 1741 self.assertEqual(mle.MleFrameCounter(1), actual_mle_message.command.tlvs[2]) 1742 1743 self.assertEqual(mle.Mode(receiver=1, secure=1, device_type=0, network_data=1), 1744 actual_mle_message.command.tlvs[3]) 1745 1746 self.assertEqual(mle.Timeout(240), actual_mle_message.command.tlvs[4]) 1747 1748 self.assertEqual(mle.Version(2), actual_mle_message.command.tlvs[5]) 1749 1750 self.assertEqual( 1751 mle.AddressRegistration(addresses=[ 1752 mle.AddressCompressed(cid=0, iid=bytearray([0x86, 0xa2, 0x1b, 0x81, 0x6d, 0xb8, 0xb5, 0xe8])) 1753 ]), actual_mle_message.command.tlvs[6]) 1754 1755 self.assertEqual(mle.TlvRequest(tlvs=[10, 12, 9]), actual_mle_message.command.tlvs[7]) 1756 1757 self.assertEqual(bytearray(data[-4:]), actual_mle_message.mic) 1758 1759 def test_should_create_MleMessageSecured_with_MLE_Data_Response_from_bytearray_when_parse_method_is_called(self): 1760 # GIVEN 1761 message_info = common.MessageInfo() 1762 message_info.source_ipv6 = "fe80::241c:b11b:7b62:caf1" 1763 message_info.destination_ipv6 = "ff02::1" 1764 1765 message_info.source_mac_address = common.MacAddress.from_eui64( 1766 bytearray([0x26, 0x1c, 0xb1, 0x1b, 0x7b, 0x62, 0xca, 0xf1])) 1767 message_info.destination_mac_address = common.MacAddress.from_eui64( 1768 bytearray([0x3a, 0xba, 0xad, 0xca, 0xfe, 0xde, 0xff, 0xa5])) 1769 1770 data = bytearray([ 1771 0x00, 0x15, 0x15, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0xca, 0xd3, 0x45, 0xe2, 0x35, 0x1d, 0x00, 1772 0x2d, 0x72, 0x71, 0xb1, 0x19, 0xaf, 0x8b, 0x05, 0xd9, 0x52, 0x74, 0xce, 0xe6, 0x36, 0x53, 0xeb, 0xc6, 0x25, 1773 0x94, 0x01, 0x6d, 0x20, 0xdf, 0x30, 0x82, 0xf8, 0xbb, 0x34, 0x47, 0x42, 0x50, 0xe9, 0x41, 0xa7, 0x33, 0xa5 1774 ]) 1775 1776 factory = config.create_default_mle_message_factory(network_key=config.DEFAULT_NETWORK_KEY) 1777 1778 # WHEN 1779 actual_mle_message = factory.parse(io.BytesIO(data), message_info) 1780 1781 # THEN 1782 self.assertTrue(isinstance(actual_mle_message, mle.MleMessageSecured)) 1783 1784 self.assertEqual(8, actual_mle_message.command.type) 1785 1786 self.assertEqual(mle.SourceAddress(address=0x9400), actual_mle_message.command.tlvs[0]) 1787 1788 self.assertEqual( 1789 mle.LeaderData(partition_id=0x06d014ca, 1790 weighting=64, 1791 data_version=131, 1792 stable_data_version=168, 1793 leader_router_id=37), actual_mle_message.command.tlvs[1]) 1794 1795 self.assertEqual( 1796 mle.NetworkData(tlvs=[ 1797 network_data.Prefix(domain_id=0, 1798 prefix_length=64, 1799 prefix=bytearray([0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34]), 1800 sub_tlvs=[ 1801 network_data.LowpanId(c=1, cid=1, context_length=64, stable=1), 1802 network_data.BorderRouter( 1803 border_router_16=37888, prf=0, p=1, s=1, d=0, c=0, r=1, o=1, n=0, stable=1) 1804 ], 1805 stable=1) 1806 ]), actual_mle_message.command.tlvs[2]) 1807 1808 self.assertEqual(bytearray(data[-4:]), actual_mle_message.mic) 1809 1810 1811if __name__ == "__main__": 1812 unittest.main() 1813