1#!/usr/bin/env python3 2# 3# Copyright (c) 2016, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28# 29 30import abc 31import io 32import ipaddress 33import struct 34 35from binascii import hexlify 36 37import common 38 39try: 40 from itertools import izip_longest as zip_longest 41except ImportError: 42 from itertools import zip_longest 43 44# Next headers for IPv6 protocols 45IPV6_NEXT_HEADER_HOP_BY_HOP = 0 46IPV6_NEXT_HEADER_TCP = 6 47IPV6_NEXT_HEADER_UDP = 17 48IPV6_NEXT_HEADER_FRAGMENT = 44 49IPV6_NEXT_HEADER_ICMP = 58 50 51UPPER_LAYER_PROTOCOLS = [ 52 IPV6_NEXT_HEADER_TCP, 53 IPV6_NEXT_HEADER_UDP, 54 IPV6_NEXT_HEADER_ICMP, 55] 56 57# ICMP Protocol codes 58ICMP_DESTINATION_UNREACHABLE = 1 59ICMP_TIME_EXCEEDED = 3 60ICMP_ECHO_REQUEST = 128 61ICMP_ECHO_RESPONSE = 129 62 63# Default hop limit for IPv6 64HOP_LIMIT_DEFAULT = 64 65 66 67def calculate_checksum(data): 68 """ Calculate checksum from data bytes. 69 70 How to calculate checksum (RFC 2460): 71 https://tools.ietf.org/html/rfc2460#page-27 72 73 Args: 74 data (bytes): input data from which checksum will be calculated 75 76 Returns: 77 int: calculated checksum 78 """ 79 # Create halfwords from data bytes. Example: data[0] = 0x01, data[1] = 80 # 0xb2 => 0x01b2 81 halfwords = [((byte0 << 8) | byte1) for byte0, byte1 in zip_longest(data[::2], data[1::2], fillvalue=0x00)] 82 83 checksum = 0 84 for halfword in halfwords: 85 checksum += halfword 86 checksum = (checksum & 0xffff) + (checksum >> 16) 87 88 checksum ^= 0xffff 89 90 if checksum == 0: 91 return 0xffff 92 else: 93 return checksum 94 95 96def synthesize_ip6_address(ip6_network: ipaddress.IPv6Network, 97 ip4_address: ipaddress.IPv4Address) -> ipaddress.IPv6Address: 98 """ Synthesize an IPv6 address from a prefix for NAT64 and an IPv4 address. 99 100 Only supports /96 network for now. 101 102 Args: 103 ip6_network: The network for NAT64. 104 ip4_address: The IPv4 address. 105 106 Returns: 107 ipaddress.IPv6Address: The synthesized IPv6 address. 108 """ 109 if ip6_network.prefixlen != 96: 110 # We are only using /96 networks in openthread 111 raise NotImplementedError("synthesize_ip6_address only supports /96 networks") 112 return ipaddress.IPv6Address(int(ip6_network.network_address) | int(ip4_address)) 113 114 115class PacketFactory(object): 116 """ Interface for classes that produce objects from data. """ 117 118 def parse(self, data, message_info): 119 """ Convert data to object. 120 121 Args: 122 data (BytesIO) 123 message_info (MessageInfo) 124 125 """ 126 raise NotImplementedError 127 128 129class BuildableFromBytes(object): 130 """ Interface for classes which can be built from bytes. """ 131 132 @classmethod 133 def from_bytes(cls, data): 134 """ Convert data to object. 135 136 Args: 137 data (bytes) 138 139 """ 140 raise NotImplementedError 141 142 143class ConvertibleToBytes(object): 144 """ Interface for classes which can be converted to bytes. """ 145 146 def to_bytes(self): 147 """ Convert object to data. 148 149 Returns: 150 bytes 151 """ 152 raise NotImplementedError 153 154 def __len__(self): 155 """ Length of data (in bytes). 156 157 Returns: 158 int 159 """ 160 raise NotImplementedError 161 162 163class Header(object): 164 """ Interface for header classes. """ 165 166 __metaclass__ = abc.ABCMeta 167 168 @abc.abstractproperty 169 def type(self): 170 """ Number which can be used in the next header field in IPv6 header or next headers. 171 172 Returns: 173 int 174 """ 175 176 177class ExtensionHeader(object): 178 """ Base for classes representing Extension Headers in IPv6 packets. """ 179 180 def __init__(self, next_header, hdr_ext_len=0): 181 self.next_header = next_header 182 self.hdr_ext_len = hdr_ext_len 183 184 185class UpperLayerProtocol(Header, ConvertibleToBytes): 186 """ Base for classes representing upper layer protocol payload in IPv6 packets. """ 187 188 def __init__(self, header): 189 self.header = header 190 191 @property 192 def checksum(self): 193 """ Return checksum from upper layer protocol header. """ 194 return self.header.checksum 195 196 @checksum.setter 197 def checksum(self, value): 198 """ Set checksum value in upper layer protocol header. """ 199 self.header.checksum = value 200 201 def is_valid_checksum(self): 202 """ Return information if set checksum is valid. 203 204 It is not possible to get zero from checksum calculation. 205 Zero indicates invalid checksum value. 206 207 Returns: 208 bool 209 """ 210 return self.checksum != 0 211 212 213class IPv6PseudoHeader(ConvertibleToBytes): 214 """ Class representing IPv6 pseudo header which is required to calculate 215 upper layer protocol (like e.g. UDP or ICMPv6) checksum. 216 217 This class is used only during upper layer protocol checksum calculation. Do not use it outside of this module. 218 219 """ 220 221 def __init__(self, source_address, destination_address, payload_length, next_header): 222 self._source_address = self._convert_to_ipaddress(source_address) 223 self._destination_address = self._convert_to_ipaddress(destination_address) 224 self.payload_length = payload_length 225 self.next_header = next_header 226 227 def _convert_to_ipaddress(self, value): 228 if isinstance(value, bytearray): 229 value = bytes(value) 230 231 return ipaddress.ip_address(value) 232 233 @property 234 def source_address(self): 235 return self._source_address 236 237 @source_address.setter 238 def source_address(self, value): 239 self._source_address = self._convert_to_ipaddress(value) 240 241 @property 242 def destination_address(self): 243 return self._destination_address 244 245 @destination_address.setter 246 def destination_address(self, value): 247 self._source_address = self._convert_to_ipaddress(value) 248 249 def to_bytes(self): 250 data = bytearray() 251 data += self.source_address.packed 252 data += self.destination_address.packed 253 data += struct.pack(">I", self.payload_length) 254 data += struct.pack(">I", self.next_header) 255 256 return data 257 258 259class IPv6Header(ConvertibleToBytes, BuildableFromBytes): 260 """ Class representing IPv6 packet header. """ 261 262 _version = 6 263 264 _header_length = 40 265 266 def __init__( 267 self, 268 source_address, 269 destination_address, 270 traffic_class=0, 271 flow_label=0, 272 hop_limit=64, 273 payload_length=0, 274 next_header=0, 275 ): 276 self.version = self._version 277 self._source_address = self._convert_to_ipaddress(source_address) 278 self._destination_address = self._convert_to_ipaddress(destination_address) 279 self.traffic_class = traffic_class 280 self.flow_label = flow_label 281 self.hop_limit = hop_limit 282 self.payload_length = payload_length 283 self.next_header = next_header 284 285 def _convert_to_ipaddress(self, value): 286 if isinstance(value, bytearray): 287 value = bytes(value) 288 289 return ipaddress.ip_address(value) 290 291 @property 292 def source_address(self): 293 return self._source_address 294 295 @source_address.setter 296 def source_address(self, value): 297 self._source_address = self._convert_to_ipaddress(value) 298 299 @property 300 def destination_address(self): 301 return self._destination_address 302 303 def to_bytes(self): 304 data = bytearray([ 305 ((self.version & 0x0F) << 4) | ((self.traffic_class >> 4) & 0x0F), 306 ((self.traffic_class & 0x0F) << 4) | ((self.flow_label >> 16) & 0x0F), 307 ((self.flow_label >> 8) & 0xff), 308 ((self.flow_label & 0xff)), 309 ]) 310 data += struct.pack(">H", self.payload_length) 311 data += bytearray([self.next_header, self.hop_limit]) 312 data += self.source_address.packed 313 data += self.destination_address.packed 314 315 return data 316 317 @classmethod 318 def from_bytes(cls, data): 319 b = bytearray(data.read(4)) 320 321 (b[0] >> 4) & 0x0F 322 traffic_class = ((b[0] & 0x0F) << 4) | ((b[1] >> 4) & 0x0F) 323 flow_label = ((b[1] & 0x0F) << 16) | (b[2] << 8) | b[3] 324 325 payload_length = struct.unpack(">H", data.read(2))[0] 326 next_header = ord(data.read(1)) 327 hop_limit = ord(data.read(1)) 328 src_addr = bytearray(data.read(16)) 329 dst_addr = bytearray(data.read(16)) 330 331 return cls( 332 src_addr, 333 dst_addr, 334 traffic_class, 335 flow_label, 336 hop_limit, 337 payload_length, 338 next_header, 339 ) 340 341 def __repr__(self): 342 return "IPv6Header(source_address={}, destination_address={}, next_header={}, payload_length={}, \ 343 hop_limit={}, traffic_class={}, flow_label={})".format( 344 self.source_address.compressed, 345 self.destination_address.compressed, 346 self.next_header, 347 self.payload_length, 348 self.hop_limit, 349 self.traffic_class, 350 self.flow_label, 351 ) 352 353 def __len__(self): 354 return self._header_length 355 356 357class IPv6Packet(ConvertibleToBytes): 358 """ Class representing IPv6 packet. 359 360 IPv6 packet consists of IPv6 header, optional extension header, and upper layer protocol. 361 362 IPv6 packet 363 364 +-------------+----------------------------------+----------------------------------------------+ 365 | | | | 366 | IPv6 header | extension headers (zero or more) | upper layer protocol (e.g. UDP, TCP, ICMPv6) | 367 | | | | 368 +-------------+----------------------------------+----------------------------------------------+ 369 370 Extension headers: 371 - HopByHop 372 - Routing header (not implemented in this module) 373 - Fragment Header 374 375 Upper layer protocols: 376 - ICMPv6 377 - UDP 378 - TCP (not implemented in this module) 379 380 Example: 381 IPv6 packet construction without extension headers: 382 383 ipv6_packet = IPv6Packet(IPv6Header("fd00:1234:4555::ff:fe00:1800", "ff03::1"), 384 ICMPv6(ICMPv6Header(128, 0), 385 ICMPv6EchoBody(0, 2, bytes([0x80, 0x00, 0xc7, 0xbf, 0x00, 0x00, 0x00, 0x01, 386 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 387 0x41, 0x41])))) 388 389 IPv6 packet construction with extension headers: 390 391 ipv6_packet = IPv6Packet(IPv6Header("fd00:1234:4555::ff:fe00:1800", "ff03::1"), 392 ICMPv6(ICMPv6Header(128, 0), 393 ICMPv6EchoBody(0, 2, bytes([0x80, 0x00, 0xc7, 0xbf, 0x00, 0x00, 0x00, 0x01, 394 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 395 0x41, 0x41])), 396 [HopByHop(options=[ 397 HopByHopOption(HopByHopOptionHeader(_type=0x6d), 398 MPLOption(S=1, M=0, V=0, sequence=2, seed_id=bytes([0x00, 0x18]))) 399 ])]) 400 401 """ 402 403 def __init__(self, ipv6_header, upper_layer_protocol, extension_headers=None): 404 self.ipv6_header = ipv6_header 405 406 self.upper_layer_protocol = upper_layer_protocol 407 408 self.extension_headers = (extension_headers if extension_headers is not None else []) 409 410 self._update_next_header_values_in_headers() 411 412 if not upper_layer_protocol.is_valid_checksum(): 413 self.upper_layer_protocol.checksum = self.calculate_checksum() 414 415 def _validate_checksum(self): 416 checksum = self.calculate_checksum() 417 418 if self.upper_layer_protocol.checksum != checksum: 419 raise RuntimeError("Could not create IPv6 packet. " 420 "Invalid checksum: {}!={}".format(self.upper_layer_protocol.checksum, checksum)) 421 422 self.upper_layer_protocol.checksum = checksum 423 424 def _update_payload_length_value_in_ipv6_header(self): 425 self.ipv6_header.payload_length = len(self.upper_layer_protocol) + sum( 426 [len(extension_header) for extension_header in self.extension_headers]) 427 428 def _update_next_header_values_in_headers(self): 429 last_header = self.ipv6_header 430 431 for extension_header in self.extension_headers: 432 last_header.next_header = extension_header.type 433 last_header = extension_header 434 435 last_header.next_header = self.upper_layer_protocol.type 436 437 def calculate_checksum(self): 438 saved_checksum = self.upper_layer_protocol.checksum 439 440 self.upper_layer_protocol.checksum = 0 441 442 upper_layer_protocol_bytes = self.upper_layer_protocol.to_bytes() 443 444 self.upper_layer_protocol.checksum = saved_checksum 445 446 pseudo_header = IPv6PseudoHeader( 447 self.ipv6_header.source_address, 448 self.ipv6_header.destination_address, 449 len(upper_layer_protocol_bytes), 450 self.upper_layer_protocol.type, 451 ) 452 453 return calculate_checksum(pseudo_header.to_bytes() + upper_layer_protocol_bytes) 454 455 def to_bytes(self): 456 self._update_payload_length_value_in_ipv6_header() 457 self._update_next_header_values_in_headers() 458 self.upper_layer_protocol.checksum = self.calculate_checksum() 459 460 ipv6_packet = self.ipv6_header.to_bytes() 461 462 for extension_header in self.extension_headers: 463 ipv6_packet += extension_header.to_bytes() 464 465 ipv6_packet += self.upper_layer_protocol.to_bytes() 466 467 return ipv6_packet 468 469 def __repr__(self): 470 return "IPv6Packet(header={}, upper_layer_protocol={})".format(self.ipv6_header, self.upper_layer_protocol) 471 472 473class UDPHeader(ConvertibleToBytes, BuildableFromBytes): 474 """ Class representing UDP datagram header. 475 476 This header is required to construct UDP datagram. 477 478 """ 479 480 _header_length = 8 481 482 def __init__(self, src_port, dst_port, payload_length=0, checksum=0): 483 self.src_port = src_port 484 self.dst_port = dst_port 485 486 self._payload_length = payload_length 487 self.checksum = checksum 488 489 @property 490 def type(self): 491 return 17 492 493 @property 494 def payload_length(self): 495 return self._payload_length 496 497 @payload_length.setter 498 def payload_length(self, value): 499 self._payload_length = self._header_length + value 500 501 def to_bytes(self): 502 data = struct.pack(">H", self.src_port) 503 data += struct.pack(">H", self.dst_port) 504 data += struct.pack(">H", self.payload_length) 505 data += struct.pack(">H", self.checksum) 506 507 return data 508 509 @classmethod 510 def from_bytes(cls, data): 511 src_port = struct.unpack(">H", data.read(2))[0] 512 dst_port = struct.unpack(">H", data.read(2))[0] 513 payload_length = struct.unpack(">H", data.read(2))[0] 514 checksum = struct.unpack(">H", data.read(2))[0] 515 516 return cls(src_port, dst_port, payload_length, checksum) 517 518 def __len__(self): 519 return self._header_length 520 521 522class UDPDatagram(UpperLayerProtocol): 523 """ Class representing UDP datagram. 524 525 UDP is an upper layer protocol for IPv6 so it can be passed to IPv6 packet as upper_layer_protocol. 526 527 This class consists of a UDP header and payload. The example below shows how a UDP datagram can be constructed. 528 529 Example: 530 udp_dgram = UDPDatagram(UDPHeader(src_port=19788, dst_port=19788), 531 bytes([0x00, 0x15, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 532 0x00, 0x00, 0x01, 0x09, 0x01, 0x01, 0x0b, 0x03, 533 0x04, 0xc6, 0x69, 0x73, 0x51, 0x0e, 0x01, 0x80, 534 0x12, 0x02, 0x00, 0x01, 0xde, 0xad, 0xbe, 0xef])) 535 536 """ 537 538 @property 539 def type(self): 540 return 17 541 542 def __init__(self, header, payload): 543 super(UDPDatagram, self).__init__(header) 544 self.payload = payload 545 546 def to_bytes(self): 547 self.header.payload_length = len(self.payload) 548 549 data = bytearray() 550 data += self.header.to_bytes() 551 data += self.payload.to_bytes() 552 return data 553 554 def __len__(self): 555 return len(self.header) + len(self.payload) 556 557 558class ICMPv6Header(ConvertibleToBytes, BuildableFromBytes): 559 """ Class representing ICMPv6 message header. 560 561 This header is required to construct ICMPv6 message. 562 563 """ 564 565 _header_length = 4 566 567 def __init__(self, _type, code, checksum=0): 568 self.type = _type 569 self.code = code 570 571 self.checksum = checksum 572 573 def to_bytes(self): 574 return bytearray([self.type, self.code]) + struct.pack(">H", self.checksum) 575 576 @classmethod 577 def from_bytes(cls, data): 578 _type = ord(data.read(1)) 579 code = ord(data.read(1)) 580 checksum = struct.unpack(">H", data.read(2))[0] 581 582 return cls(_type, code, checksum) 583 584 def __len__(self): 585 return self._header_length 586 587 588class ICMPv6(UpperLayerProtocol): 589 """ Class representing ICMPv6 message. 590 591 ICMPv6 is an upper layer protocol for IPv6 so it can be passed to IPv6 packet as upper_layer_protocol. 592 593 This class consists of an ICMPv6 header and body. The example below shows how an ICMPv6 message can be constructed. 594 595 Example: 596 icmpv6_msg = ICMPv6(ICMPv6Header(128, 0), 597 ICMPv6EchoBody(0, 2, bytes([0x80, 0x00, 0xc7, 0xbf, 0x00, 0x00, 0x00, 0x01, 598 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 599 0x41, 0x41]))) 600 601 """ 602 603 @property 604 def type(self): 605 return 58 606 607 def __init__(self, header, body): 608 super(ICMPv6, self).__init__(header) 609 self.body = body 610 611 def to_bytes(self): 612 return bytearray(self.header.to_bytes() + self.body.to_bytes()) 613 614 def __len__(self): 615 return len(self.header) + len(self.body) 616 617 618class FragmentHeader(ExtensionHeader): 619 """ Class representing Fragment extension header. 620 621 +-------------+----------+-----------------+-----+---+----------------+ 622 | Next Header | Reserved | Fragment Offset | Res | M | Identification | 623 +-------------+----------+-----------------+-----+---+----------------+ 624 625 Fragment extension header consists of: 626 - next_header type (8 bit) 627 - fragment offset which is multiple of 8 (13 bit) 628 - more_flag to indicate further data (1 bit) 629 - identification for all associated fragments (32 bit) 630 """ 631 632 @property 633 def type(self): 634 return 44 635 636 @property 637 def identification(self): 638 return self._identification 639 640 @property 641 def more_flag(self): 642 return self._more_flag 643 644 @property 645 def offset(self): 646 return self._fragm_offset 647 648 def __init__(self, next_header=None, fragm_offset=0, more_flag=False, identification=0): 649 super(FragmentHeader, self).__init__(next_header, 0) 650 self._fragm_offset = fragm_offset 651 self._more_flag = more_flag 652 self._identification = identification 653 654 def callculate_offset(self, position): 655 return position >> 3 656 657 def to_bytes(self): 658 data = bytearray([self.next_header, 0x00]) 659 data += bytearray([self._fragm_offset >> 5, ((self._fragm_offset << 3) | self._more_flag) & 0xff]) 660 data += struct.pack(">I", self._identification) 661 662 return data 663 664 @classmethod 665 def from_bytes(cls, data): 666 next_header = struct.unpack(">B", data.read(1))[0] 667 struct.unpack(">B", data.read(1))[0] # reserved 668 fragment_offset = struct.unpack(">H", data.read(2))[0] 669 more_flag = fragment_offset & 0x1 670 identificaton = struct.unpack(">I", data.read(4))[0] 671 672 fragment_offset = fragment_offset >> 3 673 674 return cls(next_header, fragment_offset, more_flag, identificaton) 675 676 def __len__(self): 677 return 64 678 679 680class HopByHop(ExtensionHeader): 681 """ Class representing HopByHop extension header. 682 683 HopByHop extension header consists of: 684 - next_header type 685 - extension header length which is multiple of 8 686 - options 687 688 """ 689 690 _one_byte_padding = 0x00 691 _many_bytes_padding = 0x01 692 693 @property 694 def type(self): 695 return 0 696 697 def __init__(self, next_header=None, options=None, hdr_ext_len=None): 698 super(HopByHop, self).__init__(next_header, hdr_ext_len) 699 self.options = options if options is not None else [] 700 701 if hdr_ext_len is not None: 702 self.hdr_ext_len = hdr_ext_len 703 else: 704 payload_length = self._calculate_payload_length() 705 self.hdr_ext_len = self._calculate_hdr_ext_len(payload_length) 706 707 def _calculate_payload_length(self): 708 payload_length = 2 709 710 for option in self.options: 711 payload_length += len(option) 712 713 return payload_length 714 715 def _calculate_hdr_ext_len(self, payload_length): 716 count = payload_length >> 3 717 718 if (payload_length & 0x7) == 0 and count > 0: 719 return count - 1 720 721 return count 722 723 def to_bytes(self): 724 data = bytearray([self.next_header, self.hdr_ext_len]) 725 726 for option in self.options: 727 data += option.to_bytes() 728 729 # Padding 730 # 731 # More details: 732 # https://tools.ietf.org/html/rfc2460#section-4.2 733 # 734 excess_bytes = len(data) & 0x7 735 736 if excess_bytes > 0: 737 padding_length = 8 - excess_bytes 738 739 if padding_length == 1: 740 data += bytearray([self._one_byte_padding]) 741 742 else: 743 padding_length -= 2 744 data += bytearray([self._many_bytes_padding, padding_length]) 745 data += bytearray([0x00 for _ in range(padding_length)]) 746 747 return data 748 749 def __len__(self): 750 """ HopByHop extension header length 751 752 More details: 753 https://tools.ietf.org/html/rfc2460#section-4.3 754 755 """ 756 return (self.hdr_ext_len + 1) * 8 757 758 759class HopByHopOptionHeader(ConvertibleToBytes, BuildableFromBytes): 760 """ Class representing HopByHop option header. """ 761 762 _header_length = 2 763 764 def __init__(self, _type, length=None): 765 self.type = _type 766 self.length = length if length is not None else 0 767 768 def to_bytes(self): 769 return bytearray([self.type, self.length]) 770 771 @classmethod 772 def from_bytes(cls, data): 773 _type = ord(data.read(1)) 774 length = ord(data.read(1)) 775 return cls(_type, length) 776 777 def __len__(self): 778 return self._header_length 779 780 def __repr__(self): 781 return "HopByHopOptionHeader(type={}, length={})".format(self.type, self.length) 782 783 784class HopByHopOption(ConvertibleToBytes): 785 """ Class representing HopByHop option. 786 787 Class consists of two elements: HopByHopOptionHeader and value (e.g. for MPLOption). 788 789 The following example shows how any HopByHop option can be constructed. 790 791 Example: 792 HopByHop(next_header=0x3a, 793 options=[HopByHopOption(HopByHopOptionHeader(_type=0x6d), 794 MPLOption(S=1, M=0, V=0, sequence=2, seed_id=bytes([0x00, 0x18]))) 795 796 """ 797 798 def __init__(self, header, value): 799 self.value = value 800 801 self.header = header 802 self.header.length = len(self.value) 803 804 def to_bytes(self): 805 return self.header.to_bytes() + self.value.to_bytes() 806 807 def __len__(self): 808 return len(self.header) + len(self.value) 809 810 def __repr__(self): 811 return "HopByHopOption(header={}, value={})".format(self.header, self.value) 812 813 814class MPLOption(ConvertibleToBytes): 815 """ Class representing MPL option. """ 816 817 _header_length = 2 818 819 _seed_id_length = {0: 0, 1: 2, 2: 8, 3: 16} 820 821 def __init__(self, S, M, V, sequence, seed_id): 822 self.S = S 823 self.M = M 824 self.V = V 825 self.sequence = sequence 826 self.seed_id = seed_id 827 828 def to_bytes(self): 829 smv = (((self.S & 0x03) << 6) | ((self.M & 0x01) << 5) | ((self.V & 0x01) << 4)) 830 831 return bytearray([smv, self.sequence]) + self.seed_id 832 833 @classmethod 834 def from_bytes(cls, data): 835 b = ord(data.read(1)) 836 837 s = (b >> 6) & 0x03 838 m = (b >> 5) & 0x01 839 v = (b >> 4) & 0x01 840 841 sequence = ord(data.read(1)) 842 seed_id = data.read(cls._seed_id_length[s]) 843 844 return cls(s, m, v, sequence, seed_id) 845 846 def __len__(self): 847 return self._header_length + self._seed_id_length[self.S] 848 849 def __repr__(self): 850 return "MPLOption(S={}, M={}, V={}, sequence={}, seed_id={})".format(self.S, self.M, self.V, self.sequence, 851 hexlify(self.seed_id)) 852 853 854class IPv6PacketFactory(PacketFactory): 855 """ Factory that produces IPv6 packets from data. 856 857 This factory must be initialized with factories which allow to parse extension headers and upper layer protocols. 858 859 The following example shows preferable setup of IPv6PacketFactory. 860 861 Header types: 862 0: HopByHop 863 17: UDP 864 58: ICMPv6 865 866 Option types: 867 109: MPL 868 869 ICMPv6 body types: 870 128: Echo request 871 129: Echo response 872 873 Example usage: 874 875 ipv6_factory = IPv6PacketFactory( 876 ehf={ 877 0: HopByHopFactory(options_factories={ 878 109: MPLOptionFactory() 879 }) 880 }, 881 ulpf={ 882 17: UDPDatagramFactory(dst_port_factories={ 883 19788: MLEMessageFactory(), 884 19789: CoAPMessageFactory() 885 }), 886 58: ICMPv6Factory(body_factories={ 887 128: ICMPv6EchoBodyFactory(), 888 129: ICMPv6EchoBodyFactory() 889 }) 890 } 891 ) 892 893 """ 894 895 def __init__(self, ehf=None, ulpf=None): 896 """ 897 ehf - Extension Header Factory 898 ulpf - Upper Layer Protocol Factory 899 900 Args: 901 ehf(dict[int: PacketFactory]): Dictionary mapping extension header types on specialized factories. 902 ulpf(dict[int: PacketFactory]): Dictionary mapping upper layer protocol types on specialized factories. 903 """ 904 self._ehf = ehf if ehf is not None else {} 905 self._ulpf = ulpf if ulpf is not None else {} 906 907 def _is_extension_header(self, header_type): 908 return header_type not in UPPER_LAYER_PROTOCOLS 909 910 def _get_extension_header_factory_for(self, next_header): 911 try: 912 return self._ehf[next_header] 913 except KeyError: 914 raise RuntimeError("Could not get Extension Header factory for next_header={}.".format(next_header)) 915 916 def _get_upper_layer_protocol_factory_for(self, next_header): 917 try: 918 return self._ulpf[next_header] 919 except KeyError: 920 raise RuntimeError("Could not get Upper Layer Protocol factory for next_header={}.".format(next_header)) 921 922 def _parse_extension_headers(self, data, next_header, message_info): 923 extension_headers = [] 924 925 while self._is_extension_header(next_header): 926 factory = self._get_extension_header_factory_for(next_header) 927 928 extension_header = factory.parse(data, message_info) 929 930 next_header = extension_header.next_header 931 932 extension_headers.append(extension_header) 933 934 return next_header, extension_headers 935 936 def _parse_upper_layer_protocol(self, data, next_header, message_info): 937 factory = self._get_upper_layer_protocol_factory_for(next_header) 938 939 return factory.parse(data, message_info) 940 941 def parse(self, data, message_info): 942 ipv6_header = IPv6Header.from_bytes(data) 943 944 message_info.source_ipv6 = ipv6_header.source_address 945 message_info.destination_ipv6 = ipv6_header.destination_address 946 947 next_header, extension_headers = self._parse_extension_headers(data, ipv6_header.next_header, message_info) 948 949 upper_layer_protocol = self._parse_upper_layer_protocol(data, next_header, message_info) 950 951 return IPv6Packet(ipv6_header, upper_layer_protocol, extension_headers) 952 953 954class HopByHopOptionsFactory(object): 955 """ Factory that produces HopByHop options. """ 956 957 _one_byte_padding = 0x00 958 _many_bytes_padding = 0x01 959 960 def __init__(self, options_factories=None): 961 self._options_factories = (options_factories if options_factories is not None else {}) 962 963 def _get_HopByHopOption_value_factory(self, _type): 964 try: 965 return self._options_factories[_type] 966 except KeyError: 967 raise RuntimeError("Could not find HopByHopOption value factory for type={}.".format(_type)) 968 969 def parse(self, data, message_info): 970 options = [] 971 972 while data.tell() < len(data.getvalue()): 973 option_header = HopByHopOptionHeader.from_bytes(data) 974 975 if option_header.type == self._one_byte_padding: 976 # skip one byte padding 977 data.read(1) 978 979 elif option_header.type == self._many_bytes_padding: 980 # skip n bytes padding 981 data.read(option_header.length) 982 983 else: 984 factory = self._get_HopByHopOption_value_factory(option_header.type) 985 986 option_data = data.read(option_header.length) 987 988 option = HopByHopOption( 989 option_header, 990 factory.parse(io.BytesIO(option_data), message_info), 991 ) 992 993 options.append(option) 994 995 return options 996 997 998class HopByHopFactory(PacketFactory): 999 """ Factory that produces HopByHop extension headers from data. """ 1000 1001 def __init__(self, hop_by_hop_options_factory): 1002 self._hop_by_hop_options_factory = hop_by_hop_options_factory 1003 1004 def _calculate_extension_header_length(self, hdr_ext_len): 1005 return (hdr_ext_len + 1) * 8 1006 1007 def parse(self, data, message_info): 1008 next_header = ord(data.read(1)) 1009 1010 hdr_ext_len = ord(data.read(1)) 1011 1012 # Note! Two bytes were read (next_header and hdr_ext_len) so they must 1013 # be subtracted from header length 1014 hop_by_hop_length = (self._calculate_extension_header_length(hdr_ext_len) - 2) 1015 1016 hop_by_hop_data = data.read(hop_by_hop_length) 1017 1018 options = self._hop_by_hop_options_factory.parse(io.BytesIO(hop_by_hop_data), message_info) 1019 1020 hop_by_hop = HopByHop(next_header, options, hdr_ext_len) 1021 1022 message_info.payload_length += len(hop_by_hop) 1023 1024 return hop_by_hop 1025 1026 1027class MPLOptionFactory(PacketFactory): 1028 """ Factory that produces MPL options for HopByHop extension header. """ 1029 1030 def parse(self, data, message_info): 1031 return MPLOption.from_bytes(data) 1032 1033 1034class UDPHeaderFactory: 1035 """ Factory that produces UDP header. """ 1036 1037 def parse(self, data, message_info): 1038 return UDPHeader.from_bytes(data) 1039 1040 1041class UdpBasedOnSrcDstPortsPayloadFactory: 1042 1043 # TODO: Unittests 1044 """ Factory that produces UDP payload. """ 1045 1046 def __init__(self, src_dst_port_based_payload_factories): 1047 """ 1048 Args: 1049 src_dst_port_based_payload_factories (PacketFactory): 1050 Factories parse UDP payload based on source or 1051 destination port. 1052 """ 1053 self._factories = src_dst_port_based_payload_factories 1054 1055 def parse(self, data, message_info): 1056 factory = None 1057 1058 if message_info.dst_port in self._factories: 1059 factory = self._factories[message_info.dst_port] 1060 1061 if message_info.src_port in self._factories: 1062 factory = self._factories[message_info.src_port] 1063 1064 if message_info.dst_port == common.UDP_TEST_PORT: 1065 # Ignore traffic targeted to test port 1066 return None 1067 elif factory is None: 1068 raise RuntimeError("Could not find factory to build UDP payload.") 1069 1070 return factory.parse(data, message_info) 1071 1072 1073class UDPDatagramFactory(PacketFactory): 1074 1075 # TODO: Unittests 1076 """ Factory that produces UDP datagrams. """ 1077 1078 def __init__(self, udp_header_factory, udp_payload_factory): 1079 self._udp_header_factory = udp_header_factory 1080 self._udp_payload_factory = udp_payload_factory 1081 1082 def parse(self, data, message_info): 1083 header = self._udp_header_factory.parse(data, message_info) 1084 1085 # Update message payload length: UDP header (8B) + payload length 1086 message_info.payload_length += len(header) + (len(data.getvalue()) - data.tell()) 1087 1088 message_info.src_port = header.src_port 1089 message_info.dst_port = header.dst_port 1090 1091 payload = self._udp_payload_factory.parse(data, message_info) 1092 1093 return UDPDatagram(header, payload) 1094 1095 1096class ICMPv6Factory(PacketFactory): 1097 """ Factory that produces ICMPv6 messages from data. """ 1098 1099 def __init__(self, body_factories=None): 1100 self._body_factories = (body_factories if body_factories is not None else {}) 1101 1102 def _get_icmpv6_body_factory(self, _type): 1103 try: 1104 return self._body_factories[_type] 1105 1106 except KeyError: 1107 if "default" not in self._body_factories: 1108 raise RuntimeError("Could not find specialized factory to parse ICMP body. " 1109 "Unsupported ICMP type: {}".format(_type)) 1110 1111 default_factory = self._body_factories["default"] 1112 1113 print("Could not find specialized factory to parse ICMP body. " 1114 "Take the default one: {}".format(type(default_factory))) 1115 1116 return default_factory 1117 1118 def parse(self, data, message_info): 1119 header = ICMPv6Header.from_bytes(data) 1120 1121 factory = self._get_icmpv6_body_factory(header.type) 1122 1123 message_info.payload_length += len(header) + (len(data.getvalue()) - data.tell()) 1124 1125 return ICMPv6(header, factory.parse(data, message_info)) 1126 1127 1128class ICMPv6EchoBodyFactory(PacketFactory): 1129 """ Factory that produces ICMPv6 echo message body. """ 1130 1131 def parse(self, data, message_info): 1132 return ICMPv6EchoBody.from_bytes(data) 1133 1134 1135class BytesPayload(ConvertibleToBytes, BuildableFromBytes): 1136 """ Class representing bytes payload. """ 1137 1138 def __init__(self, data): 1139 self.data = data 1140 1141 def to_bytes(self): 1142 return bytearray(self.data) 1143 1144 @classmethod 1145 def from_bytes(cls, data): 1146 return cls(data) 1147 1148 def __len__(self): 1149 return len(self.data) 1150 1151 1152class BytesPayloadFactory(PacketFactory): 1153 """ Factory that produces bytes payload. """ 1154 1155 def parse(self, data, message_info): 1156 return BytesPayload(data.read()) 1157 1158 1159class ICMPv6EchoBody(ConvertibleToBytes, BuildableFromBytes): 1160 """ Class representing body of ICMPv6 echo messages. """ 1161 1162 _header_length = 4 1163 1164 def __init__(self, identifier, sequence_number, data): 1165 self.identifier = identifier 1166 self.sequence_number = sequence_number 1167 self.data = data 1168 1169 def to_bytes(self): 1170 data = struct.pack(">H", self.identifier) 1171 data += struct.pack(">H", self.sequence_number) 1172 data += self.data 1173 return data 1174 1175 @classmethod 1176 def from_bytes(cls, data): 1177 identifier = struct.unpack(">H", data.read(2))[0] 1178 sequence_number = struct.unpack(">H", data.read(2))[0] 1179 1180 return cls(identifier, sequence_number, data.read()) 1181 1182 def __len__(self): 1183 return self._header_length + len(self.data) 1184 1185 1186class ICMPv6DestinationUnreachableFactory(PacketFactory): 1187 """ Factory that produces ICMPv6 echo message body. """ 1188 1189 def parse(self, data, message_info): 1190 return ICMPv6DestinationUnreachable.from_bytes(data) 1191 1192 1193class ICMPv6DestinationUnreachable(ConvertibleToBytes, BuildableFromBytes): 1194 """ Class representing body of ICMPv6 Destination Unreachable messages. """ 1195 1196 _header_length = 4 1197 _unused = 0 1198 1199 def __init__(self, data): 1200 self.data = data 1201 1202 def to_bytes(self): 1203 data = bytearray(struct.pack(">I", self._unused)) 1204 data += self.data 1205 return data 1206 1207 @classmethod 1208 def from_bytes(cls, data): 1209 unused = struct.unpack(">I", data.read(4))[0] 1210 if unused != 0: 1211 raise RuntimeError( 1212 "Invalid value of unused field in the ICMPv6 Destination Unreachable data. Expected value: 0.") 1213 1214 return cls(bytearray(data.read())) 1215 1216 def __len__(self): 1217 return self._header_length + len(self.data) 1218