1#!/usr/bin/env python3 2# 3# Copyright (c) 2016, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28# 29 30import io 31import logging 32import struct 33 34from binascii import hexlify 35 36import common 37 38from enum import IntEnum 39from tlvs_parsing import UnknownTlvFactory 40 41 42class CommandType(IntEnum): 43 LINK_REQUEST = 0 44 LINK_ACCEPT = 1 45 LINK_ACCEPT_AND_REQUEST = 2 46 LINK_REJECT = 3 47 ADVERTISEMENT = 4 48 UPDATE = 5 49 UPDATE_REQUEST = 6 50 DATA_REQUEST = 7 51 DATA_RESPONSE = 8 52 PARENT_REQUEST = 9 53 PARENT_RESPONSE = 10 54 CHILD_ID_REQUEST = 11 55 CHILD_ID_RESPONSE = 12 56 CHILD_UPDATE_REQUEST = 13 57 CHILD_UPDATE_RESPONSE = 14 58 ANNOUNCE = 15 59 DISCOVERY_REQUEST = 16 60 DISCOVERY_RESPONSE = 17 61 LINK_METRICS_MANAGEMENT_REQUEST = 18 62 LINK_METRICS_MANAGEMENT_RESPONSE = 19 63 LINK_PROBE = 20 64 TIME_SYNC = 99 65 66 67class TlvType(IntEnum): 68 SOURCE_ADDRESS = 0 69 MODE = 1 70 TIMEOUT = 2 71 CHALLENGE = 3 72 RESPONSE = 4 73 LINK_LAYER_FRAME_COUNTER = 5 74 MLE_FRAME_COUNTER = 8 75 ROUTE64 = 9 76 ADDRESS16 = 10 77 LEADER_DATA = 11 78 NETWORK_DATA = 12 79 TLV_REQUEST = 13 80 SCAN_MASK = 14 81 CONNECTIVITY = 15 82 LINK_MARGIN = 16 83 STATUS = 17 84 VERSION = 18 85 ADDRESS_REGISTRATION = 19 86 CHANNEL = 20 87 PANID = 21 88 ACTIVE_TIMESTAMP = 22 89 PENDING_TIMESTAMP = 23 90 ACTIVE_OPERATIONAL_DATASET = 24 91 PENDING_OPERATIONAL_DATASET = 25 92 THREAD_DISCOVERY = 26 93 SUPERVISION_INTERVAL = 27 94 CSL_CHANNEL = 80 95 CSL_SYNCHRONIZED_TIMEOUT = 85 96 CSL_CLOCK_ACCURACY = 86 97 LINK_METRICS_QUERY = 87 98 LINK_METRICS_MANAGEMENT = 88 99 LINK_METRICS_REPORT = 89 100 LINK_PROBE = 90 101 TIME_REQUEST = 252 102 TIME_PARAMETER = 253 103 104 105class LinkMetricsSubTlvType(IntEnum): 106 LINK_METRICS_REPORT = 0 107 LINK_METRICS_QUERY_ID = 1 108 LINK_METRICS_QUERY_OPTIONS = 2 109 FORWARD_PROBING_REGISTRATION = 3 110 LINK_METRICS_STATUS = 5 111 ENHANCED_ACK_LINK_METRICS_CONFIGURATION = 7 112 113 114class SourceAddress(object): 115 116 def __init__(self, address): 117 self._address = address 118 119 @property 120 def address(self): 121 return self._address 122 123 def __eq__(self, other): 124 common.expect_the_same_class(self, other) 125 126 return self.address == other.address 127 128 def __repr__(self): 129 return "SourceAddress(address={})".format(hex(self._address)) 130 131 132class SourceAddressFactory: 133 134 def parse(self, data, message_info): 135 address = struct.unpack(">H", data.read(2))[0] 136 return SourceAddress(address) 137 138 139class Mode(object): 140 141 def __init__(self, receiver, secure, device_type, network_data): 142 self._receiver = receiver 143 self._secure = secure 144 self._device_type = device_type 145 self._network_data = network_data 146 147 @property 148 def receiver(self): 149 return self._receiver 150 151 @property 152 def secure(self): 153 return self._secure 154 155 @property 156 def device_type(self): 157 return self._device_type 158 159 @property 160 def network_data(self): 161 return self._network_data 162 163 def __eq__(self, other): 164 common.expect_the_same_class(self, other) 165 166 return (self.receiver == other.receiver and self.secure == other.secure and 167 self.device_type == other.device_type and self.network_data == other.network_data) 168 169 def __repr__(self): 170 return "Mode(receiver={}, secure={}, device_type={}, network_data={})".format( 171 self.receiver, self.secure, self.device_type, self.network_data) 172 173 174class ModeFactory: 175 176 def parse(self, data, message_info): 177 mode = ord(data.read(1)) 178 receiver = (mode >> 3) & 0x01 179 secure = (mode >> 2) & 0x01 180 device_type = (mode >> 1) & 0x01 181 network_data = (mode >> 0) & 0x01 182 return Mode(receiver, secure, device_type, network_data) 183 184 185class Timeout(object): 186 187 def __init__(self, timeout): 188 self._timeout = timeout 189 190 @property 191 def timeout(self): 192 return self._timeout 193 194 def __eq__(self, other): 195 common.expect_the_same_class(self, other) 196 197 return self.timeout == other.timeout 198 199 def __repr__(self): 200 return "Timeout(timeout={})".format(self.timeout) 201 202 203class TimeoutFactory: 204 205 def parse(self, data, message_info): 206 timeout = struct.unpack(">I", data.read(4))[0] 207 return Timeout(timeout) 208 209 210class Challenge(object): 211 212 def __init__(self, challenge): 213 self._challenge = challenge 214 215 @property 216 def challenge(self): 217 return self._challenge 218 219 def __eq__(self, other): 220 common.expect_the_same_class(self, other) 221 222 return self.challenge == other.challenge 223 224 def __repr__(self): 225 return "Challenge(challenge={})".format(hexlify(self.challenge)) 226 227 228class ChallengeFactory: 229 230 def parse(self, data, message_info): 231 challenge = data.read() 232 return Challenge(challenge) 233 234 235class Response(object): 236 237 def __init__(self, response): 238 self._response = response 239 240 @property 241 def response(self): 242 return self._response 243 244 def __eq__(self, other): 245 common.expect_the_same_class(self, other) 246 247 return self.response == other.response 248 249 def __repr__(self): 250 return "Response(response={})".format(hexlify(self.response)) 251 252 253class ResponseFactory: 254 255 def parse(self, data, message_info): 256 response = data.read() 257 return Response(response) 258 259 260class LinkLayerFrameCounter(object): 261 262 def __init__(self, frame_counter): 263 self._frame_counter = frame_counter 264 265 @property 266 def frame_counter(self): 267 return self._frame_counter 268 269 def __eq__(self, other): 270 common.expect_the_same_class(self, other) 271 272 return self.frame_counter == other.frame_counter 273 274 def __repr__(self): 275 return "LinkLayerFrameCounter(frame_counter={})".format(self.frame_counter) 276 277 278class LinkLayerFrameCounterFactory: 279 280 def parse(self, data, message_info): 281 frame_counter = struct.unpack(">I", data.read(4))[0] 282 return LinkLayerFrameCounter(frame_counter) 283 284 285class MleFrameCounter(object): 286 287 def __init__(self, frame_counter): 288 self._frame_counter = frame_counter 289 290 @property 291 def frame_counter(self): 292 return self._frame_counter 293 294 def __eq__(self, other): 295 common.expect_the_same_class(self, other) 296 297 return self.frame_counter == other.frame_counter 298 299 def __repr__(self): 300 return "MleFrameCounter(frame_counter={})".format(self.frame_counter) 301 302 303class MleFrameCounterFactory: 304 305 def parse(self, data, message_info): 306 frame_counter = struct.unpack(">I", data.read(4))[0] 307 return MleFrameCounter(frame_counter) 308 309 310class LinkQualityAndRouteData(object): 311 312 def __init__(self, output, _input, route): 313 self._output = output 314 self._input = _input 315 self._route = route 316 317 @property 318 def output(self): 319 return self._output 320 321 @property 322 def input(self): 323 return self._input 324 325 @property 326 def route(self): 327 return self._route 328 329 def __eq__(self, other): 330 common.expect_the_same_class(self, other) 331 332 return (self.output == other.output and self.input == other.input and self.route == other.route) 333 334 def __repr__(self): 335 return "LinkQualityAndRouteData(output={}, input={}, route={})".format(self.output, self.input, self.route) 336 337 338class LinkQualityAndRouteDataFactory: 339 340 def parse(self, data, message_info): 341 lqrd = ord(data.read(1)) 342 output = (lqrd >> 6) & 0x3 343 _input = (lqrd >> 4) & 0x3 344 route = lqrd & 0x0F 345 return LinkQualityAndRouteData(output, _input, route) 346 347 348class Route64(object): 349 350 def __init__(self, id_sequence, router_id_mask, link_quality_and_route_data): 351 self._id_sequence = id_sequence 352 self._router_id_mask = router_id_mask 353 self._link_quality_and_route_data = link_quality_and_route_data 354 355 @property 356 def id_sequence(self): 357 return self._id_sequence 358 359 @property 360 def router_id_mask(self): 361 return self._router_id_mask 362 363 @property 364 def link_quality_and_route_data(self): 365 return self._link_quality_and_route_data 366 367 def __eq__(self, other): 368 common.expect_the_same_class(self, other) 369 370 return (self.id_sequence == other.id_sequence and self.router_id_mask == other.router_id_mask and 371 self.link_quality_and_route_data == other.link_quality_and_route_data) 372 373 def __repr__(self): 374 lqrd_str = ", ".join(["{}".format(lqrd) for lqrd in self.link_quality_and_route_data]) 375 return "Route64(id_sequence={}, router_id_mask={}, link_quality_and_route_data=[{}])".format( 376 self.id_sequence, hex(self.router_id_mask), lqrd_str) 377 378 379class Route64Factory: 380 381 def __init__(self, link_quality_and_route_data_factory): 382 self._lqrd_factory = link_quality_and_route_data_factory 383 384 def parse(self, data, message_info): 385 id_sequence = ord(data.read(1)) 386 router_id_mask = struct.unpack(">Q", data.read(8))[0] 387 388 link_quality_and_route_data = [] 389 390 while data.tell() < len(data.getvalue()): 391 link_quality_and_route_data.append(self._lqrd_factory.parse(data, message_info)) 392 393 return Route64(id_sequence, router_id_mask, link_quality_and_route_data) 394 395 396class Address16(object): 397 398 def __init__(self, address): 399 self._address = address 400 401 @property 402 def address(self): 403 return self._address 404 405 def __eq__(self, other): 406 common.expect_the_same_class(self, other) 407 408 return self.address == other.address 409 410 def __repr__(self): 411 return "Address16(address={})".format(hex(self.address)) 412 413 414class Address16Factory: 415 416 def parse(self, data, message_info): 417 address = struct.unpack(">H", data.read(2))[0] 418 return Address16(address) 419 420 421class LeaderData(object): 422 423 def __init__( 424 self, 425 partition_id, 426 weighting, 427 data_version, 428 stable_data_version, 429 leader_router_id, 430 ): 431 self._partition_id = partition_id 432 self._weighting = weighting 433 self._data_version = data_version 434 self._stable_data_version = stable_data_version 435 self._leader_router_id = leader_router_id 436 437 @property 438 def partition_id(self): 439 return self._partition_id 440 441 @property 442 def weighting(self): 443 return self._weighting 444 445 @property 446 def data_version(self): 447 return self._data_version 448 449 @property 450 def stable_data_version(self): 451 return self._stable_data_version 452 453 @property 454 def leader_router_id(self): 455 return self._leader_router_id 456 457 def __eq__(self, other): 458 common.expect_the_same_class(self, other) 459 460 return (self.partition_id == other.partition_id and self.weighting == other.weighting and 461 self.data_version == other.data_version and self.stable_data_version == other.stable_data_version and 462 self.leader_router_id == other.leader_router_id) 463 464 def __repr__(self): 465 return 'LeaderData(partition_id={}, weighting={}, data_version={}, stable_data_version={},leader_router_id={}'.format( 466 self.partition_id, 467 self.weighting, 468 self.data_version, 469 self.stable_data_version, 470 self.leader_router_id, 471 ) 472 473 474class LeaderDataFactory: 475 476 def parse(self, data, message_info): 477 partition_id = struct.unpack(">I", data.read(4))[0] 478 weighting = ord(data.read(1)) 479 data_version = ord(data.read(1)) 480 stable_data_version = ord(data.read(1)) 481 leader_router_id = ord(data.read(1)) 482 return LeaderData( 483 partition_id, 484 weighting, 485 data_version, 486 stable_data_version, 487 leader_router_id, 488 ) 489 490 491class NetworkData(object): 492 493 def __init__(self, tlvs): 494 self._tlvs = tlvs 495 496 @property 497 def tlvs(self): 498 return self._tlvs 499 500 def __eq__(self, other): 501 common.expect_the_same_class(self, other) 502 503 return self.tlvs == other.tlvs 504 505 def __repr__(self): 506 tlvs_str = ", ".join(["{}".format(tlv) for tlv in self.tlvs]) 507 return "NetworkData(tlvs=[{}])".format(tlvs_str) 508 509 510class NetworkDataFactory: 511 512 def __init__(self, network_data_tlvs_factory): 513 self._tlvs_factory = network_data_tlvs_factory 514 515 def parse(self, data, message_info): 516 tlvs = self._tlvs_factory.parse(data, message_info) 517 return NetworkData(tlvs) 518 519 520class TlvRequest(object): 521 522 def __init__(self, tlvs): 523 self._tlvs = tlvs 524 525 @property 526 def tlvs(self): 527 return self._tlvs 528 529 def __eq__(self, other): 530 common.expect_the_same_class(self, other) 531 532 return self.tlvs == other.tlvs 533 534 def __repr__(self): 535 tlvs_str = ", ".join(["{}".format(tlv) for tlv in self.tlvs]) 536 return "TlvRequest(tlvs=[{}])".format(tlvs_str) 537 538 539class TlvRequestFactory: 540 541 def parse(self, data, message_info): 542 tlvs = [b for b in bytearray(data.read())] 543 return TlvRequest(tlvs) 544 545 546class ScanMask(object): 547 548 def __init__(self, router, end_device): 549 self._router = router 550 self._end_device = end_device 551 552 @property 553 def router(self): 554 return self._router 555 556 @property 557 def end_device(self): 558 return self._end_device 559 560 def __eq__(self, other): 561 common.expect_the_same_class(self, other) 562 563 return (self.router == other.router and self.end_device == other.end_device) 564 565 def __repr__(self): 566 return "ScanMask(router={}, end_device={})".format(self.router, self.end_device) 567 568 569class ScanMaskFactory: 570 571 def parse(self, data, message_info): 572 scan_mask = ord(data.read(1)) 573 router = (scan_mask >> 7) & 0x01 574 end_device = (scan_mask >> 6) & 0x01 575 return ScanMask(router, end_device) 576 577 578class Connectivity(object): 579 580 def __init__( 581 self, 582 pp_byte, 583 link_quality_3, 584 link_quality_2, 585 link_quality_1, 586 leader_cost, 587 id_sequence, 588 active_routers, 589 sed_buffer_size=None, 590 sed_datagram_count=None, 591 ): 592 self._pp_byte = pp_byte 593 self._link_quality_3 = link_quality_3 594 self._link_quality_2 = link_quality_2 595 self._link_quality_1 = link_quality_1 596 self._leader_cost = leader_cost 597 self._id_sequence = id_sequence 598 self._active_routers = active_routers 599 self._sed_buffer_size = sed_buffer_size 600 self._sed_datagram_count = sed_datagram_count 601 602 @property 603 def pp_byte(self): 604 return self._pp_byte 605 606 @property 607 def pp(self): 608 return common.map_pp(self._pp_byte) 609 610 @property 611 def link_quality_3(self): 612 return self._link_quality_3 613 614 @property 615 def link_quality_2(self): 616 return self._link_quality_2 617 618 @property 619 def link_quality_1(self): 620 return self._link_quality_1 621 622 @property 623 def leader_cost(self): 624 return self._leader_cost 625 626 @property 627 def id_sequence(self): 628 return self._id_sequence 629 630 @property 631 def active_routers(self): 632 return self._active_routers 633 634 @property 635 def sed_buffer_size(self): 636 return self._sed_buffer_size 637 638 @property 639 def sed_datagram_count(self): 640 return self._sed_datagram_count 641 642 def __eq__(self, other): 643 common.expect_the_same_class(self, other) 644 645 return (self.pp == other.pp and self.link_quality_3 == other.link_quality_3 and 646 self.link_quality_2 == other.link_quality_2 and self.link_quality_1 == other.link_quality_1 and 647 self.leader_cost == other.leader_cost and self.id_sequence == other.id_sequence and 648 self.active_routers == other.active_routers and self.sed_buffer_size == other.sed_buffer_size and 649 self.sed_datagram_count == other.sed_datagram_count) 650 651 def __repr__(self): 652 return r"Connectivity(pp={}, \ 653 link_quality_3={}, \ 654 link_quality_2={}, \ 655 link_quality_1={}, \ 656 leader_cost={}, \ 657 id_sequence={}, \ 658 active_routers={}, \ 659 sed_buffer_size={}, \ 660 sed_datagram_count={})".format( 661 self.pp, 662 self.link_quality_3, 663 self.link_quality_2, 664 self.link_quality_1, 665 self.leader_cost, 666 self.id_sequence, 667 self.active_routers, 668 self.sed_buffer_size, 669 self.sed_datagram_count, 670 ) 671 672 673class ConnectivityFactory: 674 675 def parse(self, data, message_info): 676 pp_byte = ord(data.read(1)) 677 link_quality_3 = ord(data.read(1)) 678 link_quality_2 = ord(data.read(1)) 679 link_quality_1 = ord(data.read(1)) 680 leader_cost = ord(data.read(1)) 681 id_sequence = ord(data.read(1)) 682 active_routers = ord(data.read(1)) 683 684 sed_data = io.BytesIO(data.read(3)) 685 686 if len(sed_data.getvalue()) > 0: 687 sed_buffer_size = struct.unpack(">H", sed_data.read(2))[0] 688 sed_datagram_count = ord(sed_data.read(1)) 689 else: 690 sed_buffer_size = None 691 sed_datagram_count = None 692 693 return Connectivity( 694 pp_byte, 695 link_quality_3, 696 link_quality_2, 697 link_quality_1, 698 leader_cost, 699 id_sequence, 700 active_routers, 701 sed_buffer_size, 702 sed_datagram_count, 703 ) 704 705 706class LinkMargin(object): 707 708 def __init__(self, link_margin): 709 self._link_margin = link_margin 710 711 @property 712 def link_margin(self): 713 return self._link_margin 714 715 def __eq__(self, other): 716 common.expect_the_same_class(self, other) 717 718 return self.link_margin == other.link_margin 719 720 def __repr__(self): 721 return "LinkMargin(link_margin={})".format(self.link_margin) 722 723 724class LinkMarginFactory: 725 726 def parse(self, data, message_info): 727 link_margin = ord(data.read(1)) 728 return LinkMargin(link_margin) 729 730 731class Status(object): 732 733 def __init__(self, status): 734 self._status = status 735 736 @property 737 def status(self): 738 return self._status 739 740 def __eq__(self, other): 741 common.expect_the_same_class(self, other) 742 743 return self.status == other.status 744 745 def __repr__(self): 746 return "Status(status={})".format(self.status) 747 748 749class StatusFactory: 750 751 def parse(self, data, message_info): 752 status = ord(data.read(1)) 753 return Status(status) 754 755 756class Version(object): 757 758 def __init__(self, version): 759 self._version = version 760 761 @property 762 def version(self): 763 return self._version 764 765 def __eq__(self, other): 766 common.expect_the_same_class(self, other) 767 768 return self.version == other.version 769 770 def __repr__(self): 771 return "Version(version={})".format(self.version) 772 773 774class VersionFactory: 775 776 def parse(self, data, message_info): 777 version = struct.unpack(">H", data.read(2))[0] 778 return Version(version) 779 780 781class AddressFull(object): 782 783 def __init__(self, ipv6_address): 784 self._ipv6_address = ipv6_address 785 786 @property 787 def ipv6_address(self): 788 return self._ipv6_address 789 790 def __eq__(self, other): 791 common.expect_the_same_class(self, other) 792 793 return self.ipv6_address == other.ipv6_address 794 795 def __repr__(self): 796 return "AddressFull(ipv6_address={}')".format(hexlify(self.ipv6_address)) 797 798 799class AddressFullFactory: 800 801 def parse(self, data, message_info): 802 data.read(1) # first byte is ignored 803 ipv6_address = data.read(16) 804 return AddressFull(ipv6_address) 805 806 807class AddressCompressed(object): 808 809 def __init__(self, cid, iid): 810 self._cid = cid 811 self._iid = iid 812 813 @property 814 def cid(self): 815 return self._cid 816 817 @property 818 def iid(self): 819 return self._iid 820 821 def __eq__(self, other): 822 common.expect_the_same_class(self, other) 823 824 return self.cid == other.cid and self.iid == other.iid 825 826 def __repr__(self): 827 return "AddressCompressed(cid={}, iid={}')".format(self.cid, hexlify(self.iid)) 828 829 830class AddressCompressedFactory: 831 832 def parse(self, data, message_info): 833 cid = ord(data.read(1)) & 0x0F 834 iid = bytearray(data.read(8)) 835 return AddressCompressed(cid, iid) 836 837 838class AddressRegistration(object): 839 840 def __init__(self, addresses): 841 self._addresses = addresses 842 843 @property 844 def addresses(self): 845 return self._addresses 846 847 def __eq__(self, other): 848 common.expect_the_same_class(self, other) 849 850 return self.addresses == other.addresses 851 852 def __repr__(self): 853 addresses_str = ", ".join(["{}".format(address) for address in self.addresses]) 854 return "AddressRegistration(addresses=[{}])".format(addresses_str) 855 856 857class AddressRegistrationFactory: 858 859 def __init__(self, addr_compressed_factory, addr_full_factory): 860 self._addr_compressed_factory = addr_compressed_factory 861 self._addr_full_factory = addr_full_factory 862 863 def parse(self, data, message_info): 864 addresses = [] 865 866 while data.tell() < len(data.getvalue()): 867 compressed = (ord(data.read(1)) >> 7) & 0x01 868 data.seek(-1, io.SEEK_CUR) 869 870 if compressed: 871 addresses.append(self._addr_compressed_factory.parse(data, message_info)) 872 else: 873 addresses.append(self._addr_full_factory.parse(data, message_info)) 874 875 return AddressRegistration(addresses) 876 877 878class Channel(object): 879 880 def __init__(self, channel_page, channel): 881 self._channel_page = channel_page 882 self._channel = channel 883 884 @property 885 def channel_page(self): 886 return self._channel_page 887 888 @property 889 def channel(self): 890 return self._channel 891 892 def __eq__(self, other): 893 common.expect_the_same_class(self, other) 894 895 return (self.channel_page == other.channel_page and self.channel == other.channel) 896 897 def __repr__(self): 898 return "Channel(channel_page={}, channel={})".format(self.channel_page, self.channel) 899 900 901class ChannelFactory: 902 903 def parse(self, data, message_info): 904 channel_page = ord(data.read(1)) 905 channel = struct.unpack(">H", data.read(2))[0] 906 return Channel(channel_page, channel) 907 908 909class PanId: 910 911 def __init__(self, pan_id): 912 self._pan_id = pan_id 913 914 @property 915 def pan_id(self): 916 return self._pan_id 917 918 def __eq__(self, other): 919 common.expect_the_same_class(self, other) 920 921 return self.pan_id == other.pan_id 922 923 def __repr__(self): 924 return "PanId(pan_id={})".format(self.pan_id) 925 926 927class PanIdFactory: 928 929 def parse(self, data, message_info): 930 pan_id = struct.unpack(">H", data.read(2))[0] 931 return PanId(pan_id) 932 933 934class ActiveTimestamp(object): 935 936 def __init__(self, timestamp_seconds, timestamp_ticks, u): 937 self._timestamp_seconds = timestamp_seconds 938 self._timestamp_ticks = timestamp_ticks 939 self._u = u 940 941 @property 942 def timestamp_seconds(self): 943 return self._timestamp_seconds 944 945 @property 946 def timestamp_ticks(self): 947 return self._timestamp_ticks 948 949 @property 950 def u(self): 951 return self._u 952 953 def __eq__(self, other): 954 common.expect_the_same_class(self, other) 955 956 return (self.timestamp_seconds == other.timestamp_seconds and self.timestamp_ticks == other.timestamp_ticks and 957 self.u == other.u) 958 959 def __repr__(self): 960 return "ActiveTimestamp(timestamp_seconds={}, timestamp_ticks={}, u={})".format( 961 self.timestamp_seconds, self.timestamp_ticks, self.u) 962 963 964class ActiveTimestampFactory: 965 966 def parse(self, data, message_info): 967 seconds = bytearray([0x00, 0x00]) + bytearray(data.read(6)) 968 ticks = struct.unpack(">H", data.read(2))[0] 969 970 timestamp_seconds = struct.unpack(">Q", bytes(seconds))[0] 971 timestamp_ticks = ticks >> 1 972 u = ticks & 0x01 973 return ActiveTimestamp(timestamp_seconds, timestamp_ticks, u) 974 975 976class PendingTimestamp(object): 977 978 def __init__(self, timestamp_seconds, timestamp_ticks, u): 979 self._timestamp_seconds = timestamp_seconds 980 self._timestamp_ticks = timestamp_ticks 981 self._u = u 982 983 @property 984 def timestamp_seconds(self): 985 return self._timestamp_seconds 986 987 @property 988 def timestamp_ticks(self): 989 return self._timestamp_ticks 990 991 @property 992 def u(self): 993 return self._u 994 995 def __eq__(self, other): 996 common.expect_the_same_class(self, other) 997 998 return (self.timestamp_seconds == other.timestamp_seconds and self.timestamp_ticks == other.timestamp_ticks and 999 self.u == other.u) 1000 1001 def __repr__(self): 1002 return "PendingTimestamp(timestamp_seconds={}, timestamp_ticks={}, u={})".format( 1003 self.timestamp_seconds, self.timestamp_ticks, self.u) 1004 1005 1006class PendingTimestampFactory: 1007 1008 def parse(self, data, message_info): 1009 seconds = bytearray([0x00, 0x00]) + bytearray(data.read(6)) 1010 ticks = struct.unpack(">H", data.read(2))[0] 1011 1012 timestamp_seconds = struct.unpack(">Q", bytes(seconds))[0] 1013 timestamp_ticks = ticks >> 1 1014 u = ticks & 0x01 1015 return PendingTimestamp(timestamp_seconds, timestamp_ticks, u) 1016 1017 1018class ActiveOperationalDataset: 1019 # TODO: Not implemented yet 1020 1021 def __init__(self): 1022 print("ActiveOperationalDataset is not implemented yet.") 1023 1024 1025class ActiveOperationalDatasetFactory: 1026 1027 def parse(self, data, message_info): 1028 return ActiveOperationalDataset() 1029 1030 1031class PendingOperationalDataset: 1032 # TODO: Not implemented yet 1033 1034 def __init__(self): 1035 print("PendingOperationalDataset is not implemented yet.") 1036 1037 1038class PendingOperationalDatasetFactory: 1039 1040 def parse(self, data, message_info): 1041 return PendingOperationalDataset() 1042 1043 1044class ThreadDiscovery(object): 1045 1046 def __init__(self, tlvs): 1047 self._tlvs = tlvs 1048 1049 @property 1050 def tlvs(self): 1051 return self._tlvs 1052 1053 def __eq__(self, other): 1054 return self.tlvs == other.tlvs 1055 1056 def __repr__(self): 1057 return "ThreadDiscovery(tlvs={})".format(self.tlvs) 1058 1059 1060class ThreadDiscoveryFactory: 1061 1062 def __init__(self, thread_discovery_tlvs_factory): 1063 self._tlvs_factory = thread_discovery_tlvs_factory 1064 1065 def parse(self, data, message_info): 1066 tlvs = self._tlvs_factory.parse(data, message_info) 1067 return ThreadDiscovery(tlvs) 1068 1069 1070class CslChannel: 1071 # TODO: Not implemented yet 1072 1073 def __init__(self): 1074 print("CslChannel is not implemented yet.") 1075 1076 1077class CslChannelFactory: 1078 # TODO: Not implemented yet 1079 1080 def parse(self, data, message_info): 1081 return CslChannel() 1082 1083 1084class CslSynchronizedTimeout: 1085 # TODO: Not implemented yet 1086 1087 def __init__(self): 1088 print("CslSynchronizedTimeout is not implemented yet.") 1089 1090 1091class CslSynchronizedTimeoutFactory: 1092 1093 def parse(self, data, message_info): 1094 return CslSynchronizedTimeout() 1095 1096 1097class CslClockAccuracy: 1098 # TODO: Not implemented yet 1099 1100 def __init__(self): 1101 print("CslClockAccuracy is not implemented yet.") 1102 1103 1104class CslClockAccuracyFactory: 1105 1106 def parse(self, data, message_info): 1107 return CslClockAccuracy() 1108 1109 1110class TimeRequest: 1111 # TODO: Not implemented yet 1112 1113 def __init__(self): 1114 print("TimeRequest is not implemented yet.") 1115 1116 1117class TimeRequestFactory: 1118 1119 def parse(self, data, message_info): 1120 return TimeRequest() 1121 1122 1123class TimeParameter: 1124 # TODO: Not implemented yet 1125 1126 def __init__(self): 1127 print("TimeParameter is not implemented yet.") 1128 1129 1130class TimeParameterFactory: 1131 1132 def parse(self, data, message_info): 1133 return TimeParameter() 1134 1135 1136class LinkMetricsQuery: 1137 # TODO: Not implemented yet 1138 1139 def __init__(self): 1140 print("LinkMetricsQuery is not implemented yet.") 1141 1142 1143class LinkMetricsQueryFactory: 1144 1145 def parse(self, data, message_info): 1146 return LinkMetricsQuery() 1147 1148 1149class LinkMetricsManagement: 1150 # TODO: Not implemented yet 1151 1152 def __init__(self): 1153 print("LinkMetricsManagement is not implemented yet.") 1154 1155 1156class LinkMetricsManagementFactory: 1157 1158 def parse(self, data, message_info): 1159 return LinkMetricsManagement() 1160 1161 1162class LinkMetricsReport: 1163 # TODO: Not implemented yet 1164 1165 def __init__(self): 1166 print("LinkMetricsReport is not implemented yet.") 1167 1168 1169class LinkMetricsReportFactory: 1170 1171 def parse(self, data, message_info): 1172 return LinkMetricsReport() 1173 1174 1175class LinkProbe: 1176 # TODO: Not implemented yet 1177 1178 def __init__(self): 1179 print("LinkProbe is not implemented yet.") 1180 1181 1182class LinkProbeFactory: 1183 1184 def parse(self, data, message_info): 1185 return LinkProbe() 1186 1187 1188class MleCommand(object): 1189 1190 def __init__(self, _type, tlvs): 1191 self._type = _type 1192 self._tlvs = tlvs 1193 1194 @property 1195 def type(self): 1196 return self._type 1197 1198 @property 1199 def tlvs(self): 1200 return self._tlvs 1201 1202 def __repr__(self): 1203 tlvs_str = ", ".join(["{}".format(tlv) for tlv in self.tlvs]) 1204 return "MleCommand(type={}, tlvs=[{}])".format(self.type.name, tlvs_str) 1205 1206 1207class MleCommandFactory: 1208 1209 _MARKER_EXTENDED_LENGTH = 0xff 1210 1211 def __init__(self, tlvs_factories): 1212 self._tlvs_factories = tlvs_factories 1213 1214 def _get_length(self, data): 1215 length = ord(data.read(1)) 1216 1217 if length == self._MARKER_EXTENDED_LENGTH: 1218 length = struct.unpack(">H", data.read(2))[0] 1219 1220 return length 1221 1222 def _get_tlv_factory(self, _type): 1223 try: 1224 return self._tlvs_factories[_type] 1225 except KeyError: 1226 logging.error('Could not find TLV factory. Unsupported TLV type: {}'.format(_type)) 1227 return UnknownTlvFactory(_type) 1228 1229 def _parse_tlv(self, data, message_info): 1230 _type = TlvType(ord(data.read(1))) 1231 length = self._get_length(data) 1232 value = data.read(length) 1233 1234 factory = self._get_tlv_factory(_type) 1235 1236 return factory.parse(io.BytesIO(value), message_info) 1237 1238 def parse(self, data, message_info): 1239 cmd_type = CommandType(ord(data.read(1))) 1240 tlvs = [] 1241 1242 while data.tell() < len(data.getvalue()): 1243 tlv = self._parse_tlv(data, message_info) 1244 tlvs.append(tlv) 1245 1246 return MleCommand(cmd_type, tlvs) 1247 1248 1249class MleMessage(object): 1250 1251 def __init__(self, command): 1252 self._command = command 1253 1254 @property 1255 def command(self): 1256 return self._command 1257 1258 def __repr__(self): 1259 return "MleMessage(command={})".format(self.command) 1260 1261 1262class MleMessageSecured(MleMessage): 1263 1264 def __init__(self, aux_sec_hdr, command, mic): 1265 super(MleMessageSecured, self).__init__(command) 1266 self._aux_sec_hdr = aux_sec_hdr 1267 self._mic = mic 1268 1269 @property 1270 def aux_sec_hdr(self): 1271 return self._aux_sec_hdr 1272 1273 @property 1274 def mic(self): 1275 return self._mic 1276 1277 def __repr__(self): 1278 return "MleMessageSecured(aux_sec_hdr={}, command={}, mic=\"{}\")".format(self.aux_sec_hdr, self.command, 1279 hexlify(self.mic)) 1280 1281 1282class MleMessageFactory: 1283 1284 def __init__(self, aux_sec_hdr_factory, mle_command_factory, crypto_engine): 1285 self._aux_sec_hdr_factory = aux_sec_hdr_factory 1286 self._mle_command_factory = mle_command_factory 1287 self._crypto_engine = crypto_engine 1288 1289 def _create_mle_secured_message(self, data, message_info): 1290 aux_sec_hdr = self._aux_sec_hdr_factory.parse(data, message_info) 1291 1292 enc_data_length = len(data.getvalue()) 1293 1294 enc_data = bytearray(data.read(enc_data_length - data.tell() - self._crypto_engine.mic_length)) 1295 mic = bytearray(data.read()) 1296 1297 dec_data = self._crypto_engine.decrypt(enc_data, mic, message_info) 1298 1299 command = self._mle_command_factory.parse(io.BytesIO(dec_data), message_info) 1300 1301 return MleMessageSecured(aux_sec_hdr, command, mic) 1302 1303 def _create_mle_message(self, data, message_info): 1304 command = self._mle_command_factory.parse(data, message_info) 1305 1306 return MleMessage(command) 1307 1308 def parse(self, data, message_info): 1309 security_indicator = ord(data.read(1)) 1310 1311 if security_indicator == 0: 1312 return self._create_mle_secured_message(data, message_info) 1313 1314 elif security_indicator == 255: 1315 return self._create_mle_message(data, message_info) 1316 1317 else: 1318 raise RuntimeError( 1319 "Could not create MLE message. Unknown security indicator value: {}".format(security_indicator)) 1320