1#!/usr/bin/env python3 2# 3# Copyright (c) 2016, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28# 29 30import io 31import random 32import string 33import struct 34import unittest 35 36from ipaddress import ip_address 37 38from ipv6 import ( 39 ICMPv6Header, 40 UDPHeader, 41 IPv6Header, 42 IPv6PacketFactory, 43 UDPDatagram, 44 UDPDatagramFactory, 45 ICMPv6Factory, 46 HopByHopFactory, 47 MPLOptionFactory, 48 ICMPv6, 49 HopByHopOptionHeader, 50 HopByHopOption, 51 HopByHop, 52 MPLOption, 53 IPv6Packet, 54 ICMPv6EchoBody, 55 BytesPayload, 56 ICMPv6EchoBodyFactory, 57 UpperLayerProtocol, 58 UDPHeaderFactory, 59 HopByHopOptionsFactory, 60 ICMPv6DestinationUnreachableFactory, 61 BytesPayloadFactory, 62 ICMPv6DestinationUnreachable, 63 UdpBasedOnSrcDstPortsPayloadFactory, 64 FragmentHeader, 65) 66 67import common 68 69 70class HopByHopOptionBytesValue: 71 """ Test helper class """ 72 73 _value = "value" 74 75 def __init__(self, _bytes): 76 self.bytes = _bytes 77 78 def to_bytes(self): 79 return self.bytes 80 81 def to_dict(self, d=None): 82 d = d if d is not None else dict() 83 84 d[self._value] = self.bytes 85 return d 86 87 def __len__(self): 88 return len(self.bytes) 89 90 91class ICMPv6BytesBody: 92 """ Test helper class """ 93 94 _icmp_body = "icmp_body" 95 96 def __init__(self, _bytes): 97 self.bytes = _bytes 98 99 def to_bytes(self): 100 return self.bytes 101 102 def to_dict(self, d=None): 103 104 d[self._icmp_body] = self.bytes 105 return d 106 107 def __len__(self): 108 return len(self.bytes) 109 110 111class ICMPv6BytesBodyFactory: 112 """ Test helper class """ 113 114 def parse(self, data, context): 115 return ICMPv6BytesBody(data.read()) 116 117 118class DummyHeader: 119 120 def __init__(self): 121 self.checksum = 0 122 123 124class DummyUpperLayerProtocol(UpperLayerProtocol): 125 126 def __init__(self, header, data, _type): 127 super(DummyUpperLayerProtocol, self).__init__(header) 128 self._data = data 129 self._type = _type 130 131 @property 132 def type(self): 133 return self._type 134 135 def to_bytes(self): 136 return self._data 137 138 def __len__(self): 139 return len(self._data) 140 141 142def any_uint(bits): 143 return random.randint(0, (1 << bits) - 1) 144 145 146def any_type(): 147 return any_uint(8) 148 149 150def any_code(): 151 return any_uint(8) 152 153 154def any_checksum(): 155 return any_uint(16) 156 157 158def any_fragment_offset(): 159 return any_uint(13) 160 161 162def any_bool(): 163 return (any_uint(1) == 1) 164 165 166def any_fragment_identification(): 167 return any_uint(32) 168 169 170def any_icmp_payload(_type, code, checksum, body): 171 return bytearray([_type, code, (checksum >> 8) & 0xff, checksum & 0xff]) + body 172 173 174def any_udp_payload(src_port, dst_port, payload, checksum): 175 payload_len = len(payload) + 8 176 return bytearray([(src_port >> 8) & 0xff, src_port & 0xff, (dst_port >> 8) & 0xff, dst_port & 0xff, 177 (payload_len >> 8) & 0xff, payload_len & 0xff, 178 (checksum >> 8) & 0xff, checksum & 0xff]) + payload 179 180 181def any_hop_by_hop_payload(next_header, hdr_ext_len, payload): 182 return bytearray([next_header, hdr_ext_len]) + payload 183 184 185def any_body(): 186 length = any_uint(8) 187 body = "".join([random.choice(string.ascii_letters + string.digits + string.hexdigits) for _ in range(length)]) 188 return bytearray(body.encode("utf-8")) 189 190 191def any_payload(): 192 length = any_uint(8) 193 payload = "".join([random.choice(string.printable) for _ in range(length)]) 194 return bytearray(payload.encode("utf-8")) 195 196 197def any_ip_address(): 198 return bytearray([0xfe, 0x80]) + bytearray([0x00] * 6) + bytearray([random.getrandbits(8)] * 8) 199 200 201def any_port(): 202 return any_uint(16) 203 204 205def any_mpl_opt_type(): 206 return any_uint(8) 207 208 209def any_mpl_opt_data_len(): 210 return any_uint(8) 211 212 213def any_mpl_S(): 214 return any_uint(2) 215 216 217def any_mpl_M(): 218 return any_uint(1) 219 220 221def any_mpl_V(): 222 return any_uint(1) 223 224 225def any_mpl_sequence(): 226 return any_uint(8) 227 228 229def any_mpl_seed_id(S): 230 length = MPLOption._seed_id_length[S] 231 seed_id = "".join([random.choice(string.ascii_letters + string.digits + string.hexdigits) for _ in range(length)]) 232 return bytearray(seed_id.encode("utf-8")) 233 234 235def any_next_header(): 236 return any_uint(8) 237 238 239def any_traffic_class(): 240 return any_uint(8) 241 242 243def any_flow_label(): 244 return any_uint(20) 245 246 247def any_hop_limit(): 248 return any_uint(8) 249 250 251def any_payload_length(): 252 return any_uint(16) 253 254 255def any_hdr_ext_len(): 256 return any_uint(3) 257 258 259def any_length(): 260 return any_uint(4) 261 262 263def any_str(length=8): 264 s = "".join(random.choice(string.printable) for _ in range(length)) 265 return s.encode("utf-8") 266 267 268def any_bytes(length=4): 269 return bytearray(any_str(length)) 270 271 272def any_dict(keys_count=4): 273 keys = [any_str() for _ in range(keys_count)] 274 275 d = {} 276 for key in keys: 277 d[key] = any_bytes() 278 279 return d 280 281 282def any_mpl_option(): 283 S = any_mpl_S() 284 M = any_mpl_M() 285 V = any_mpl_V() 286 sequence = any_mpl_sequence() 287 seed_id = any_mpl_seed_id(S) 288 289 return MPLOption(S, M, V, sequence, seed_id) 290 291 292def any_hop_by_hop_bytes_option_header(length=4): 293 _type = any_type() 294 295 # 0 or 1 means padding, so type have to be higher than 1 296 while _type <= 1: 297 _type = any_type() 298 299 return HopByHopOptionHeader(_type, length) 300 301 302def any_hop_by_hop_bytes_value(length=2): 303 return HopByHopOptionBytesValue(any_bytes(length)) 304 305 306def any_hop_by_hop_bytes_option(): 307 length = any_length() 308 return HopByHopOption(any_hop_by_hop_bytes_option_header(length), any_hop_by_hop_bytes_value(length)) 309 310 311def any_hop_by_hop_mpl_option(): 312 mpl_option = any_mpl_option() 313 return HopByHopOption(any_hop_by_hop_bytes_option_header(len(mpl_option)), mpl_option) 314 315 316def any_identifier(): 317 return any_uint(16) 318 319 320def any_sequence_number(): 321 return any_uint(16) 322 323 324def any_data(): 325 return any_bytes(random.randint(0, 32)) 326 327 328def any_upper_layer_payload(data, _type): 329 return DummyUpperLayerProtocol(DummyHeader(), data, _type) 330 331 332def any_extension_headers(): 333 return [] 334 335 336def any_message_info(): 337 return common.MessageInfo() 338 339 340class TestIPv6Header(unittest.TestCase): 341 342 def test_should_convert_IPv6_header_to_bytes_when_to_bytes_method_is_called(self): 343 # GIVEN 344 traffic_class = any_traffic_class() 345 flow_label = any_flow_label() 346 payload_length = any_payload_length() 347 next_header = any_next_header() 348 hop_limit = any_hop_limit() 349 source_address = any_ip_address() 350 destination_address = any_ip_address() 351 352 ipv6_header = IPv6Header(source_address, destination_address, traffic_class, flow_label, hop_limit, 353 payload_length, next_header) 354 355 # WHEN 356 data = ipv6_header.to_bytes() 357 358 # THEN 359 self.assertEqual(6, data[0] >> 4) 360 self.assertEqual(traffic_class, ((data[0] << 8 | data[1]) >> 4) & 0xff) 361 self.assertEqual(flow_label, ((data[1] & 0x0F) << 16) | (data[2] << 8) | data[3]) 362 self.assertEqual(payload_length, struct.unpack("!H", data[4:6])[0]) 363 self.assertEqual(next_header, data[6]) 364 self.assertEqual(hop_limit, data[7]) 365 self.assertEqual(source_address, data[8:24]) 366 self.assertEqual(destination_address, data[24:40]) 367 368 def test_should_create_IPv6Header_when_from_bytes_classmethod_is_called(self): 369 # GIVEN 370 traffic_class = any_traffic_class() 371 flow_label = any_flow_label() 372 payload_length = any_payload_length() 373 next_header = any_next_header() 374 hop_limit = any_hop_limit() 375 source_address = any_ip_address() 376 destination_address = any_ip_address() 377 378 data = bytearray([(6 << 4) | (traffic_class >> 4), (traffic_class & 0xF) << 4 | (flow_label >> 16) & 0xF, 379 (flow_label >> 8) & 0xff, flow_label & 0xff, payload_length >> 8, payload_length & 0xff, 380 next_header, hop_limit]) 381 data += ip_address(bytes(source_address)).packed + ip_address(bytes(destination_address)).packed 382 383 # WHEN 384 ipv6_header = IPv6Header.from_bytes(io.BytesIO(data)) 385 386 # THEN 387 self.assertEqual(6, ipv6_header.version) 388 self.assertEqual(traffic_class, ipv6_header.traffic_class) 389 self.assertEqual(flow_label, ipv6_header.flow_label) 390 self.assertEqual(payload_length, ipv6_header.payload_length) 391 self.assertEqual(next_header, ipv6_header.next_header) 392 self.assertEqual(hop_limit, ipv6_header.hop_limit) 393 self.assertEqual(source_address, ipv6_header.source_address.packed) 394 self.assertEqual(destination_address, ipv6_header.destination_address.packed) 395 396 def test_should_return_proper_header_length_when_IPv6Packet_object_is_called_in_len(self): 397 # GIVEN 398 ipv6_header = IPv6Header(any_traffic_class(), any_flow_label(), any_payload_length(), any_next_header(), 399 any_hop_limit(), any_ip_address(), any_ip_address()) 400 401 # WHEN 402 ipv6_header_length = len(ipv6_header) 403 404 # THEN 405 self.assertEqual(40, ipv6_header_length) 406 407 408class TestUDPHeader(unittest.TestCase): 409 410 def test_should_convert_UDP_header_to_bytes_when_to_bytes_method_is_called(self): 411 # GIVEN 412 src_port = any_port() 413 dst_port = any_port() 414 payload_length = any_payload_length() 415 checksum = any_checksum() 416 417 udp_header = UDPHeader(src_port, dst_port, payload_length, checksum) 418 419 # WHEN 420 data = udp_header.to_bytes() 421 422 # THEN 423 self.assertEqual(src_port, struct.unpack("!H", data[0:2])[0]) 424 self.assertEqual(dst_port, struct.unpack("!H", data[2:4])[0]) 425 self.assertEqual(payload_length, struct.unpack("!H", data[4:6])[0]) 426 self.assertEqual(checksum, struct.unpack("!H", data[6:])[0]) 427 428 def test_should_create_UDPHeader_when_from_bytes_classmethod_is_called(self): 429 # GIVEN 430 src_port = any_port() 431 dst_port = any_port() 432 payload_length = any_payload_length() 433 checksum = any_checksum() 434 435 data = struct.pack("!H", src_port) + struct.pack("!H", dst_port) + \ 436 struct.pack("!H", payload_length) + struct.pack("!H", checksum) 437 438 # WHEN 439 udp_header = UDPHeader.from_bytes(io.BytesIO(data)) 440 441 # THEN 442 self.assertEqual(src_port, udp_header.src_port) 443 self.assertEqual(dst_port, udp_header.dst_port) 444 self.assertEqual(payload_length, udp_header.payload_length) 445 self.assertEqual(checksum, udp_header.checksum) 446 447 def test_should_return_proper_header_length_when_UDPHeader_object_is_called_in_len(self): 448 # GIVEN 449 udp_header = UDPHeader(any_port(), any_port(), any_payload_length(), any_checksum()) 450 451 # WHEN 452 udp_header_length = len(udp_header) 453 454 # THEN 455 self.assertEqual(8, udp_header_length) 456 457 def test_should_return_17_when_type_property_is_called(self): 458 # GIVEN 459 udp_header = UDPHeader(any_port(), any_port(), any_payload_length(), any_checksum()) 460 461 # THEN 462 self.assertEqual(17, udp_header.type) 463 464 465class TestICMPv6Header(unittest.TestCase): 466 467 def test_should_convert_icmp_message_header_to_bytes_when_to_bytes_method_is_called(self): 468 # GIVEN 469 _type = any_type() 470 code = any_code() 471 checksum = any_checksum() 472 473 icmpv6_header = ICMPv6Header(_type, code, checksum) 474 475 # WHEN 476 data = icmpv6_header.to_bytes() 477 478 # THEN 479 self.assertEqual(_type, data[0]) 480 self.assertEqual(code, data[1]) 481 self.assertEqual(checksum, struct.unpack("!H", data[2:])[0]) 482 483 def test_should_create_ICMPv6Header_when_to_bytes_classmethod_is_called(self): 484 # GIVEN 485 _type = any_type() 486 code = any_code() 487 checksum = any_checksum() 488 489 data = bytearray([_type, code]) + struct.pack("!H", checksum) 490 491 # WHEN 492 icmpv6_header = ICMPv6Header.from_bytes(io.BytesIO(data)) 493 494 # THEN 495 self.assertEqual(_type, icmpv6_header.type) 496 self.assertEqual(code, icmpv6_header.code) 497 self.assertEqual(checksum, icmpv6_header.checksum) 498 499 def test_should_return_proper_header_length_when_ICMPv6Header_object_is_called_in_len(self): 500 # GIVEN 501 icmpv6_header = ICMPv6Header(any_type(), any_code(), any_checksum()) 502 503 # WHEN 504 icmpv6_header_length = len(icmpv6_header) 505 506 # THEN 507 self.assertEqual(4, icmpv6_header_length) 508 509 510class TestIPv6Packet(unittest.TestCase): 511 512 def test_should_build_IPv6Packet_with_ICMP_payload_from_well_know_values_when_to_bytes_method_is_called(self): 513 # GIVEN 514 515 ipv6_packet = IPv6Packet( 516 IPv6Header(source_address="fd00:1234:4555::ff:fe00:1800", destination_address="ff03::1"), 517 ICMPv6( 518 ICMPv6Header(128, 0), 519 ICMPv6EchoBody( 520 0, 2, 521 bytearray([ 522 0x80, 0x00, 0xc7, 0xbf, 0x00, 0x00, 0x00, 0x01, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 523 0x41, 0x41 524 ]))), [ 525 HopByHop(options=[ 526 HopByHopOption(HopByHopOptionHeader( 527 _type=0x6d), MPLOption(S=1, M=0, V=0, sequence=2, seed_id=bytearray([0x00, 0x18]))) 528 ]) 529 ]) 530 531 # WHEN 532 ipv6_packet_bytes = ipv6_packet.to_bytes() 533 534 # THEN 535 expected_ipv6_packet_bytes = bytearray([ 536 0x60, 0x00, 0x00, 0x00, 0x00, 0x22, 0x00, 0x40, 0xfd, 0x00, 0x12, 0x34, 0x45, 0x55, 0x00, 0x00, 0x00, 0x00, 537 0x00, 0xff, 0xfe, 0x00, 0x18, 0x00, 0xff, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 538 0x00, 0x00, 0x00, 0x01, 0x3a, 0x00, 0x6d, 0x04, 0x40, 0x02, 0x00, 0x18, 0x80, 0x00, 0x87, 0x12, 0x00, 0x00, 539 0x00, 0x02, 0x80, 0x00, 0xc7, 0xbf, 0x00, 0x00, 0x00, 0x01, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 540 0x41, 0x41 541 ]) 542 543 self.assertEqual(expected_ipv6_packet_bytes, ipv6_packet_bytes) 544 545 def test_should_build_IPv6Packet_with_UDP_payload_from_well_know_values_when_to_bytes_method_is_called(self): 546 # GIVEN 547 ipv6_header = IPv6Header(source_address="fe80::1", destination_address="ff02::2", hop_limit=255) 548 549 udp_dgram = UDPDatagram( 550 UDPHeader(src_port=19788, dst_port=19788), 551 BytesPayload( 552 bytearray([ 553 0x00, 0x15, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x09, 0x01, 0x01, 0x0b, 0x03, 554 0x04, 0xc6, 0x69, 0x73, 0x51, 0x0e, 0x01, 0x80, 0x12, 0x02, 0x00, 0x01, 0xde, 0xad, 0xbe, 0xef 555 ]))) 556 557 ipv6_packet = IPv6Packet(ipv6_header, udp_dgram) 558 559 # WHEN 560 ipv6_packet_bytes = ipv6_packet.to_bytes() 561 562 # THEN 563 expected_ipv6_packet_bytes = bytearray([ 564 0x60, 0x00, 0x00, 0x00, 0x00, 0x28, 0x11, 0xff, 0xfe, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 565 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0xff, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 566 0x00, 0x00, 0x00, 0x02, 0x4d, 0x4c, 0x4d, 0x4c, 0x00, 0x28, 0xe9, 0xf4, 0x00, 0x15, 0x00, 0x00, 0x00, 0x00, 567 0x00, 0x00, 0x00, 0x00, 0x01, 0x09, 0x01, 0x01, 0x0b, 0x03, 0x04, 0xc6, 0x69, 0x73, 0x51, 0x0e, 0x01, 0x80, 568 0x12, 0x02, 0x00, 0x01, 0xde, 0xad, 0xbe, 0xef 569 ]) 570 571 self.assertEqual(expected_ipv6_packet_bytes, ipv6_packet_bytes) 572 573 574class TestIPv6PacketFactory(unittest.TestCase): 575 576 def test_should_create_IPv6Packet_with_MPL_and_ICMP_when_to_bytes_method_is_called(self): 577 # GIVEN 578 ipv6_packet_bytes = bytearray([ 579 0x60, 0x00, 0x00, 0x00, 0x00, 0x22, 0x00, 0x40, 0xfd, 0x00, 0x12, 0x34, 0x45, 0x55, 0x00, 0x00, 0x00, 0x00, 580 0x00, 0xff, 0xfe, 0x00, 0x18, 0x00, 0xff, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 581 0x00, 0x00, 0x00, 0x01, 0x3a, 0x00, 0x6d, 0x04, 0x40, 0x02, 0x00, 0x18, 0x80, 0x00, 0x87, 0x12, 0x00, 0x00, 582 0x00, 0x02, 0x80, 0x00, 0xc7, 0xbf, 0x00, 0x00, 0x00, 0x01, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 583 0x41, 0x41 584 ]) 585 586 ipv6_factory = IPv6PacketFactory(ehf={ 587 0: 588 HopByHopFactory(hop_by_hop_options_factory=HopByHopOptionsFactory( 589 options_factories={109: MPLOptionFactory()})) 590 }, 591 ulpf={58: ICMPv6Factory(body_factories={128: ICMPv6EchoBodyFactory()})}) 592 593 # WHEN 594 ipv6_packet = ipv6_factory.parse(io.BytesIO(ipv6_packet_bytes), any_message_info()) 595 596 # THEN 597 self.assertEqual('fd00:1234:4555::ff:fe00:1800', ipv6_packet.ipv6_header.source_address.compressed) 598 self.assertEqual('ff03::1', ipv6_packet.ipv6_header.destination_address.compressed) 599 self.assertEqual(64, ipv6_packet.ipv6_header.hop_limit) 600 self.assertEqual(0, ipv6_packet.ipv6_header.next_header) 601 self.assertEqual(34, ipv6_packet.ipv6_header.payload_length) 602 self.assertEqual(0, ipv6_packet.ipv6_header.flow_label) 603 self.assertEqual(0, ipv6_packet.ipv6_header.traffic_class) 604 self.assertEqual(6, ipv6_packet.ipv6_header.version) 605 606 self.assertEqual(1, ipv6_packet.extension_headers[0].options[0].value.S) 607 self.assertEqual(0, ipv6_packet.extension_headers[0].options[0].value.M) 608 self.assertEqual(0, ipv6_packet.extension_headers[0].options[0].value.V) 609 self.assertEqual(2, ipv6_packet.extension_headers[0].options[0].value.sequence) 610 self.assertEqual(bytearray([0x00, 0x18]), ipv6_packet.extension_headers[0].options[0].value.seed_id) 611 612 self.assertEqual(34578, ipv6_packet.upper_layer_protocol.header.checksum) 613 self.assertEqual(128, ipv6_packet.upper_layer_protocol.header.type) 614 self.assertEqual(0, ipv6_packet.upper_layer_protocol.header.code) 615 self.assertEqual(0, ipv6_packet.upper_layer_protocol.body.identifier) 616 self.assertEqual(2, ipv6_packet.upper_layer_protocol.body.sequence_number) 617 self.assertEqual(b'\x80\x00\xc7\xbf\x00\x00\x00\x01AAAAAAAAAA', ipv6_packet.upper_layer_protocol.body.data) 618 619 def test_should_create_IPv6Packet_without_any_extension_header_with_ICMP_when_to_bytes_method_is_called(self): 620 # GIVEN 621 ipv6_packet_bytes = bytearray([ 622 0x60, 0x00, 0x00, 0x00, 0x00, 0x1A, 0x3A, 0x40, 0xfd, 0x00, 0x12, 0x34, 0x45, 0x55, 0x00, 0x00, 0x00, 0x00, 623 0x00, 0xff, 0xfe, 0x00, 0x18, 0x00, 0xff, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 624 0x00, 0x00, 0x00, 0x01, 0x80, 0x00, 0x87, 0x12, 0x00, 0x00, 0x00, 0x02, 0x80, 0x00, 0xc7, 0xbf, 0x00, 0x00, 625 0x00, 0x01, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41 626 ]) 627 628 ipv6_factory = IPv6PacketFactory(ulpf={58: ICMPv6Factory(body_factories={128: ICMPv6EchoBodyFactory()})}) 629 630 # WHEN 631 ipv6_packet = ipv6_factory.parse(io.BytesIO(ipv6_packet_bytes), any_message_info()) 632 ipv6_packet._validate_checksum() 633 634 # THEN 635 self.assertEqual('fd00:1234:4555::ff:fe00:1800', ipv6_packet.ipv6_header.source_address.compressed) 636 self.assertEqual('ff03::1', ipv6_packet.ipv6_header.destination_address.compressed) 637 self.assertEqual(64, ipv6_packet.ipv6_header.hop_limit) 638 self.assertEqual(58, ipv6_packet.ipv6_header.next_header) 639 self.assertEqual(26, ipv6_packet.ipv6_header.payload_length) 640 self.assertEqual(0, ipv6_packet.ipv6_header.flow_label) 641 self.assertEqual(0, ipv6_packet.ipv6_header.traffic_class) 642 self.assertEqual(6, ipv6_packet.ipv6_header.version) 643 644 self.assertEqual(34578, ipv6_packet.upper_layer_protocol.header.checksum) 645 self.assertEqual(128, ipv6_packet.upper_layer_protocol.header.type) 646 self.assertEqual(0, ipv6_packet.upper_layer_protocol.header.code) 647 self.assertEqual(0, ipv6_packet.upper_layer_protocol.body.identifier) 648 self.assertEqual(2, ipv6_packet.upper_layer_protocol.body.sequence_number) 649 self.assertEqual(b'\x80\x00\xc7\xbf\x00\x00\x00\x01AAAAAAAAAA', ipv6_packet.upper_layer_protocol.body.data) 650 651 def test_should_set_message_info_field_when_to_bytes_method_is_called(self): 652 # GIVEN 653 ipv6_packet_data = bytearray([ 654 0x60, 0x00, 0x00, 0x00, 0x00, 0x1A, 0x3A, 0x40, 0xfd, 0x00, 0x12, 0x34, 0x45, 0x55, 0x00, 0x00, 0x00, 0x00, 655 0x00, 0xff, 0xfe, 0x00, 0x18, 0x00, 0xff, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 656 0x00, 0x00, 0x00, 0x01, 0x80, 0x00, 0x87, 0x12, 0x00, 0x00, 0x00, 0x02, 0x80, 0x00, 0xc7, 0xbf, 0x00, 0x00, 657 0x00, 0x01, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41 658 ]) 659 660 message_info = any_message_info() 661 message_info.source_ipv6 = "ff::" 662 message_info.destination_address = "ff::" 663 664 factory = IPv6PacketFactory(ulpf={58: ICMPv6Factory(body_factories={128: ICMPv6EchoBodyFactory()})}) 665 666 # WHEN 667 factory.parse(io.BytesIO(ipv6_packet_data), message_info) 668 669 # THEN 670 self.assertEqual("fd00:1234:4555::ff:fe00:1800", message_info.source_ipv6.compressed) 671 self.assertEqual("ff03::1", message_info.destination_ipv6.compressed) 672 673 674class TestUDPDatagram(unittest.TestCase): 675 676 def test_should_creates_bytes_from_UDPHeader_and_payload_when_to_bytes_method_is_called(self): 677 # GIVEN 678 src_port = any_port() 679 dst_port = any_port() 680 checksum = any_checksum() 681 682 payload = any_payload() 683 payload_length = len(payload) + 8 # UDP length consists of UDP header length and payload length 684 685 udp_header = UDPHeader(src_port, dst_port, payload_length, checksum) 686 udp_payload = BytesPayload(payload) 687 udp_dgram = UDPDatagram(udp_header, udp_payload) 688 689 # WHEN 690 udp_dgram_bytes = udp_dgram.to_bytes() 691 692 # THEN 693 expected_udp_dgram_bytes = struct.pack("!H", src_port) + struct.pack("!H", dst_port) + \ 694 struct.pack("!H", payload_length) + struct.pack("!H", checksum) + payload 695 696 self.assertEqual(expected_udp_dgram_bytes, udp_dgram_bytes) 697 698 699class TestIPv6FragmentHeader(unittest.TestCase): 700 701 def test_shold_convert_IPv6_fragment_header_to_bytes_when_to_bytes_method_is_called(self): 702 # GIVEN 703 type = any_type() 704 offset = any_fragment_offset() 705 more_flag = any_bool() 706 identification = any_fragment_identification() 707 708 ipv6_fragment_header = FragmentHeader(type, offset, more_flag, identification) 709 710 # WHEN 711 actual = ipv6_fragment_header.to_bytes() 712 713 # THEN 714 expected = bytearray([type, 0x00, offset >> 5, ((offset << 3) & 0xff) | more_flag])\ 715 + struct.pack("!I", identification) 716 717 self.assertEqual(expected, actual) 718 719 def test_should_create_FragmentHeader_when_from_bytes_classmethod_is_called(self): 720 # GIVEN 721 type = any_type() 722 offset = any_fragment_offset() 723 more_flag = any_bool() 724 identification = any_fragment_identification() 725 726 data = bytearray([type, 0x00, offset >> 5, ((offset << 3) & 0xff) | more_flag])\ 727 + struct.pack("!I", identification) 728 729 # WHEN 730 ipv6_fragment_header = FragmentHeader.from_bytes(io.BytesIO(data)) 731 732 # THEN 733 self.assertEqual(type, ipv6_fragment_header.next_header) 734 self.assertEqual(offset, ipv6_fragment_header.offset) 735 self.assertEqual(more_flag, ipv6_fragment_header.more_flag) 736 self.assertEqual(identification, ipv6_fragment_header.identification) 737 738 739class TestICMPv6(unittest.TestCase): 740 741 def test_should_creates_bytes_from_ICMPv6Header_and_body_when_to_bytes_method_is_called(self): 742 # GIVEN 743 _type = any_type() 744 code = any_code() 745 checksum = any_checksum() 746 body = any_body() 747 748 icmpv6_header = ICMPv6Header(_type, code, checksum) 749 icmpv6_body = ICMPv6BytesBody(body) 750 icmpv6_msg = ICMPv6(icmpv6_header, icmpv6_body) 751 752 # WHEN 753 actual = icmpv6_msg.to_bytes() 754 755 # THEN 756 expected = bytearray([_type, code]) + struct.pack("!H", checksum) + body 757 758 self.assertEqual(expected, actual) 759 760 761class TestHopByHop(unittest.TestCase): 762 763 def _calculate_hdr_ext_len(self, payload_len): 764 count = payload_len // 8 765 rest = payload_len % 8 766 767 if rest != 0: 768 count += 1 769 770 if count == 0 and rest == 0: 771 return count 772 773 return count - 1 774 775 def _calculate_required_padding(self, content_length): 776 excess_bytes = content_length & 0x7 777 778 if excess_bytes > 0: 779 return 8 - excess_bytes 780 781 return 0 782 783 def create_padding(self, padding_length): 784 if padding_length == 1: 785 return bytearray([0x00]) 786 elif padding_length > 1: 787 padding_length -= 2 788 return bytearray([0x01, padding_length]) + bytearray([0x00 for _ in range(padding_length)]) 789 else: 790 return bytearray() 791 792 def test_should_create_bytes_from_HopByHop_when_to_bytes_method_is_called(self): 793 # GIVEN 794 next_header = any_next_header() 795 hop_by_hop_option = any_hop_by_hop_bytes_option() 796 hdr_ext_len = self._calculate_hdr_ext_len(2 + len(hop_by_hop_option)) 797 798 hop_by_hop = HopByHop(next_header, [hop_by_hop_option]) 799 800 # WHEN 801 data = hop_by_hop.to_bytes() 802 803 # THEN 804 expected_data = bytearray([next_header, hdr_ext_len]) + hop_by_hop_option.to_bytes() 805 padding_length = self._calculate_required_padding(len(expected_data)) 806 expected_data += self.create_padding(padding_length) 807 808 self.assertEqual(expected_data, data) 809 810 811class TestMPLOption(unittest.TestCase): 812 813 def test_should_convert_MPLOption_to_bytes_when_to_bytes_method_is_called(self): 814 # GIVEN 815 S = any_mpl_S() 816 M = any_mpl_M() 817 V = any_mpl_V() 818 sequence = any_mpl_sequence() 819 seed_id = any_mpl_seed_id(S) 820 821 mpl_option = MPLOption(S, M, V, sequence, seed_id) 822 823 # WHEN 824 data = mpl_option.to_bytes() 825 826 # THEN 827 expected_data = bytearray([(S << 6) | (M << 5) | (V << 4), sequence]) + seed_id 828 self.assertEqual(expected_data, data) 829 830 def test_should_create_MPLOption_when_to_bytes_method_is_called_with_data(self): 831 # GIVEN 832 S = any_mpl_S() 833 M = any_mpl_M() 834 V = any_mpl_V() 835 sequence = any_mpl_sequence() 836 seed_id = any_mpl_seed_id(S) 837 838 data = bytearray([(S << 6) | (M << 5) | (V << 4), sequence]) + seed_id 839 840 # WHEN 841 mpl_option = MPLOption.from_bytes(io.BytesIO(data)) 842 843 # THEN 844 self.assertEqual(S, mpl_option.S) 845 self.assertEqual(M, mpl_option.M) 846 self.assertEqual(V, mpl_option.V) 847 self.assertEqual(sequence, mpl_option.sequence) 848 self.assertEqual(seed_id, mpl_option.seed_id) 849 850 def test_check_if_mpl_seed_id_length_values_was_not_changed(self): 851 self.assertEqual(0, MPLOption._seed_id_length[0]) 852 self.assertEqual(2, MPLOption._seed_id_length[1]) 853 self.assertEqual(8, MPLOption._seed_id_length[2]) 854 self.assertEqual(16, MPLOption._seed_id_length[3]) 855 856 def test_should_return_proper_length_when_len_is_called_with_mpl_option_object(self): 857 # GIVEN 858 S = any_mpl_S() 859 M = any_mpl_M() 860 V = any_mpl_V() 861 sequence = any_mpl_sequence() 862 seed_id = any_mpl_seed_id(S) 863 864 mpl_option = MPLOption(S, M, V, sequence, seed_id) 865 866 # WHEN 867 mpl_option_length = len(mpl_option) 868 869 # THEN 870 SMV_and_sequence_length = 2 871 self.assertEqual(SMV_and_sequence_length + len(seed_id), mpl_option_length) 872 873 874class TestclassHopByHopOption(unittest.TestCase): 875 876 def test_should_convert_HopByHopOption_to_bytes_when_to_bytes_method_is_called(self): 877 # GIVEN 878 length = any_length() 879 header = any_hop_by_hop_bytes_option_header(length) 880 value = any_hop_by_hop_bytes_value(length) 881 882 hop_by_hop_option = HopByHopOption(header, value) 883 884 # WHEN 885 data = hop_by_hop_option.to_bytes() 886 887 # THEN 888 expected_data = header.to_bytes() + value.to_bytes() 889 self.assertEqual(expected_data, data) 890 891 def test_should_return_length_of_HopByHopOption_when_len_is_called_with_hop_by_hop_option_object(self): 892 # GIVEN 893 length = any_length() 894 header = any_hop_by_hop_bytes_option_header(length) 895 value = any_hop_by_hop_bytes_value(length) 896 897 hop_by_hop_option = HopByHopOption(header, value) 898 899 # WHEN 900 hop_by_hop_option_length = len(hop_by_hop_option) 901 902 # THEN 903 header_length = 2 904 expected_hop_by_hop_option_length = header_length + length 905 self.assertEqual(expected_hop_by_hop_option_length, hop_by_hop_option_length) 906 907 908class TestHopByHopOptionHeader(unittest.TestCase): 909 910 def test_should_convert_HopByHopOptionHeader_to_bytes_when_to_bytes_method_is_called(self): 911 # GIVEN 912 _type = any_type() 913 length = any_length() 914 915 hop_by_hop_option_header = HopByHopOptionHeader(_type, length) 916 917 # WHEN 918 data = hop_by_hop_option_header.to_bytes() 919 920 # THEN 921 expected_data = bytearray([_type, length]) 922 self.assertEqual(expected_data, data) 923 924 def test_should_create_HopByHopOptionHeader_when_to_bytes_method_is_called_with_data(self): 925 # GIVEN 926 _type = any_type() 927 length = any_length() 928 929 data = bytearray([_type, length]) 930 931 # WHEN 932 option_header = HopByHopOptionHeader.from_bytes(io.BytesIO(data)) 933 934 # THEN 935 self.assertEqual(_type, option_header.type) 936 self.assertEqual(length, option_header.length) 937 938 def test_should_return_proper_length_when_len_is_called_with_HopByHopOptionHeader_object(self): 939 # GIVEN 940 _type = any_type() 941 length = any_length() 942 943 option_header = HopByHopOptionHeader(_type, length) 944 945 # WHEN 946 option_header_length = len(option_header) 947 948 # THEN 949 expected_option_header_length = 2 950 self.assertEqual(expected_option_header_length, option_header_length) 951 952 953class TestHopByHopFactory(unittest.TestCase): 954 955 def _calculate_hdr_ext_len(self, payload_length): 956 count = payload_length >> 3 957 958 if (payload_length & 0x7) == 0 and count > 0: 959 return count - 1 960 961 return count 962 963 def padding(self, content_length): 964 excess_bytes = content_length & 0x7 965 966 if excess_bytes > 0: 967 padding_length = 8 - excess_bytes 968 969 if padding_length == 1: 970 return bytearray([0x00]) 971 elif padding_length > 1: 972 padding_length -= 2 973 return bytearray([0x01, padding_length]) + bytearray([0x00 for _ in range(padding_length)]) 974 975 return bytearray() 976 977 def test_should_create_HopByHop_object_instance_when_to_bytes_method_is_called_with_data(self): 978 # GIVEN 979 hop_by_hop_option = any_hop_by_hop_mpl_option() 980 hop_by_hop_option_type = hop_by_hop_option.header.type 981 982 next_header = any_next_header() 983 hdr_ext_len = self._calculate_hdr_ext_len(2 + len(hop_by_hop_option)) 984 985 hop_by_hop_factory = HopByHopFactory(hop_by_hop_options_factory=HopByHopOptionsFactory( 986 options_factories={hop_by_hop_option_type: MPLOptionFactory()})) 987 988 data = bytearray([next_header, hdr_ext_len]) + hop_by_hop_option.to_bytes() 989 data += self.padding(len(data)) 990 991 # WHEN 992 hop_by_hop = hop_by_hop_factory.parse(io.BytesIO(data), any_message_info()) 993 994 # THEN 995 self.assertEqual(hop_by_hop_option.value.S, hop_by_hop.options[0].value.S) 996 self.assertEqual(hop_by_hop_option.value.V, hop_by_hop.options[0].value.V) 997 self.assertEqual(hop_by_hop_option.value.M, hop_by_hop.options[0].value.M) 998 self.assertEqual(hop_by_hop_option.value.sequence, hop_by_hop.options[0].value.sequence) 999 self.assertEqual(hop_by_hop_option.value.seed_id, hop_by_hop.options[0].value.seed_id) 1000 1001 def test_should_raise_RuntimeError_when_no_option_factory_is_set_and_parse_method_is_called(self): 1002 # GIVEN 1003 hop_by_hop_option = any_hop_by_hop_mpl_option() 1004 1005 next_header = any_next_header() 1006 hdr_ext_len = self._calculate_hdr_ext_len(2 + len(hop_by_hop_option)) 1007 1008 hop_by_hop_factory = HopByHopFactory(hop_by_hop_options_factory=HopByHopOptionsFactory()) 1009 1010 data = bytes([next_header, hdr_ext_len]) + hop_by_hop_option.to_bytes() 1011 data += self.padding(len(data)) 1012 1013 # THEN 1014 self.assertRaises(RuntimeError, hop_by_hop_factory.parse, io.BytesIO(data), any_message_info()) 1015 1016 1017class TestMPLOptionFactory(unittest.TestCase): 1018 1019 def test_should_produce_MPLOption_from_bytes_when_to_bytes_method_is_called_with_data(self): 1020 # GIVEN 1021 S = any_mpl_S() 1022 M = any_mpl_M() 1023 V = any_mpl_V() 1024 sequence = any_mpl_sequence() 1025 seed_id = any_mpl_seed_id(S) 1026 1027 SMV = (S << 6) | (M << 5) | (V << 4) 1028 data = bytearray([SMV, sequence]) + seed_id 1029 1030 factory = MPLOptionFactory() 1031 1032 # WHEN 1033 mpl_opt = factory.parse(io.BytesIO(data), any_message_info()) 1034 1035 # THEN 1036 self.assertEqual(mpl_opt.S, S) 1037 self.assertEqual(mpl_opt.M, M) 1038 self.assertEqual(mpl_opt.V, V) 1039 self.assertEqual(mpl_opt.sequence, sequence) 1040 self.assertEqual(mpl_opt.seed_id, seed_id) 1041 1042 1043class TestUdpBasedOnSrcDstPortsPayloadFactory(unittest.TestCase): 1044 1045 def test_should_create_payload_from_data_when_src_port_factory_is_defined_and_parse_method_is_called(self): 1046 # GIVEN 1047 data = any_data() 1048 1049 message_info = common.MessageInfo() 1050 message_info.src_port = any_port() 1051 message_info.dst_port = any_port() 1052 1053 factory = UdpBasedOnSrcDstPortsPayloadFactory( 1054 src_dst_port_based_payload_factories={message_info.src_port: BytesPayloadFactory()}) 1055 1056 # WHEN 1057 actual_data = factory.parse(io.BytesIO(data), message_info) 1058 1059 # THEN 1060 self.assertEqual(data, actual_data.data) 1061 1062 def test_should_create_payload_from_data_when_dst_port_factory_is_defined_and_parse_method_is_called(self): 1063 # GIVEN 1064 data = any_data() 1065 1066 message_info = common.MessageInfo() 1067 message_info.src_port = any_port() 1068 message_info.dst_port = any_port() 1069 1070 factory = UdpBasedOnSrcDstPortsPayloadFactory( 1071 src_dst_port_based_payload_factories={message_info.dst_port: BytesPayloadFactory()}) 1072 1073 # WHEN 1074 actual_data = factory.parse(io.BytesIO(data), message_info) 1075 1076 # THEN 1077 self.assertEqual(data, actual_data.data) 1078 1079 def test_should_raise_RuntimeError_when_parse_method_is_called_but_required_factory_is_not_defined(self): 1080 # GIVEN 1081 data = any_data() 1082 1083 message_info = common.MessageInfo() 1084 message_info.src_port = any_port() 1085 message_info.dst_port = any_port() 1086 1087 factory = UdpBasedOnSrcDstPortsPayloadFactory(src_dst_port_based_payload_factories={}) 1088 1089 # THEN 1090 self.assertRaises(RuntimeError, factory.parse, io.BytesIO(data), message_info) 1091 1092 1093class TestUDPDatagramFactory(unittest.TestCase): 1094 1095 def test_should_produce_UDPDatagram_from_bytes_when_to_bytes_method_is_called_with_data(self): 1096 # GIVEN 1097 src_port = any_port() 1098 dst_port = any_port() 1099 checksum = any_checksum() 1100 1101 payload = any_payload() 1102 payload_length = len(payload) + len(UDPHeader(0, 0)) 1103 1104 data = bytearray([(src_port >> 8), (src_port & 0xff), (dst_port >> 8), 1105 (dst_port & 0xff), (payload_length >> 8), (payload_length & 0xff), (checksum >> 8), 1106 (checksum & 0xff)]) + payload 1107 1108 factory = UDPDatagramFactory(UDPHeaderFactory(), BytesPayloadFactory()) 1109 1110 # WHEN 1111 udp_dgram = factory.parse(io.BytesIO(data), any_message_info()) 1112 1113 # THEN 1114 self.assertEqual(udp_dgram.header.src_port, src_port) 1115 self.assertEqual(udp_dgram.header.dst_port, dst_port) 1116 self.assertEqual(udp_dgram.header.payload_length, payload_length) 1117 self.assertEqual(udp_dgram.header.checksum, checksum) 1118 self.assertEqual(udp_dgram.payload.data, payload) 1119 1120 def test_should_set_src_and_dst_port_in_message_info_when_parse_method_is_called(self): 1121 # GIVEN 1122 message_info = any_message_info() 1123 1124 src_port = any_port() 1125 dst_port = any_port() 1126 checksum = any_checksum() 1127 1128 payload = any_payload() 1129 payload_length = len(payload) + len(UDPHeader(0, 0)) 1130 1131 data = (bytearray([ 1132 (src_port >> 8), 1133 (src_port & 0xff), 1134 (dst_port >> 8), 1135 (dst_port & 0xff), 1136 (payload_length >> 8), 1137 (payload_length & 0xff), 1138 (checksum >> 8), 1139 (checksum & 0xff), 1140 ]) + payload) 1141 1142 factory = UDPDatagramFactory(UDPHeaderFactory(), BytesPayloadFactory()) 1143 1144 # WHEN 1145 factory.parse(io.BytesIO(data), message_info) 1146 1147 # THEN 1148 self.assertEqual(src_port, message_info.src_port) 1149 self.assertEqual(dst_port, message_info.dst_port) 1150 1151 1152class TestICMPv6Factory(unittest.TestCase): 1153 1154 def test_should_produce_ICMPv6_from_bytes_when_to_bytes_method_is_called_with_data(self): 1155 # GIVEN 1156 _type = any_type() 1157 code = any_code() 1158 checksum = any_checksum() 1159 body = any_body() 1160 1161 data = bytearray([_type, code, (checksum >> 8), (checksum & 0xff)]) + body 1162 1163 factory = ICMPv6Factory(body_factories={_type: ICMPv6BytesBodyFactory()}) 1164 1165 # WHEN 1166 icmpv6_msg = factory.parse(io.BytesIO(data), any_message_info()) 1167 1168 # THEN 1169 self.assertEqual(icmpv6_msg.header.type, _type) 1170 self.assertEqual(icmpv6_msg.header.code, code) 1171 self.assertEqual(icmpv6_msg.header.checksum, checksum) 1172 self.assertEqual(icmpv6_msg.body.bytes, body) 1173 1174 def test_should_raise_RuntimeError_when_method_parse_is_called_but_body_factory_is_not_present(self): 1175 # GIVEN 1176 _type = any_type() 1177 code = any_code() 1178 checksum = any_checksum() 1179 body = any_body() 1180 1181 data = bytes([_type, code, (checksum >> 8), (checksum & 0xff)]) + body 1182 1183 factory = ICMPv6Factory() 1184 1185 # WHEN 1186 self.assertRaises(RuntimeError, factory.parse, io.BytesIO(data), any_message_info()) 1187 1188 1189class TestBytesPayload(unittest.TestCase): 1190 1191 def test_should_create_BytesPayload_when_from_bytes_class_method_is_called(self): 1192 # GIVEN 1193 data = any_data() 1194 1195 # WHEN 1196 actual = BytesPayload.from_bytes(data) 1197 1198 # THEN 1199 self.assertEqual(data, actual.data) 1200 1201 def test_should_return_exactly_the_same_data_as_passed_to_constructor_when_to_bytes_method_is_called(self): 1202 # GIVEN 1203 data = any_data() 1204 payload = BytesPayload(data) 1205 1206 # WHEN 1207 actual = payload.to_bytes() 1208 1209 # THEN 1210 self.assertEqual(data, actual) 1211 1212 def test_should_return_the_same_len_as_data_passed_to_constructor_when_len_is_called_on_BytesPayload_object(self): 1213 # GIVEN 1214 data = any_data() 1215 payload = BytesPayload(data) 1216 1217 # WHEN 1218 actual = len(payload) 1219 1220 # THEN 1221 self.assertEqual(len(data), actual) 1222 1223 1224class TestICMPv6EchoBody(unittest.TestCase): 1225 1226 def test_convert_ICMPv6_echo_body_to_data_when_to_bytes_method_is_called(self): 1227 # GIVEN 1228 identifier = any_identifier() 1229 sequence_number = any_sequence_number() 1230 data = any_data() 1231 1232 body = ICMPv6EchoBody(identifier, sequence_number, data) 1233 1234 # WHEN 1235 actual = body.to_bytes() 1236 1237 # THEN 1238 expected = bytearray([identifier >> 8, identifier & 0xff, sequence_number >> 8, sequence_number & 0xff]) + data 1239 self.assertEqual(expected, actual) 1240 1241 def test_should_create_ICMPv6EchoBody_from_data_when_from_bytes_classmethod_is_called(self): 1242 # GIVEN 1243 identifier = any_identifier() 1244 sequence_number = any_sequence_number() 1245 body_data = any_data() 1246 1247 data = bytearray([(identifier >> 8), (identifier & 0xff), (sequence_number >> 8), (sequence_number & 0xff)]) 1248 data += body_data 1249 1250 # WHEN 1251 actual = ICMPv6EchoBody.from_bytes(io.BytesIO(data)) 1252 1253 # THEN 1254 self.assertEqual(identifier, actual.identifier) 1255 self.assertEqual(sequence_number, actual.sequence_number) 1256 self.assertEqual(body_data, actual.data) 1257 1258 def test_should_build_ICMPv6EchoBody_from_well_know_values_when_to_bytes_method_is_called(self): 1259 # GIVEN 1260 body = ICMPv6EchoBody( 1261 0, 2, 1262 bytearray([ 1263 0x80, 0x00, 0xc7, 0xbf, 0x00, 0x00, 0x00, 0x01, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 1264 0x41 1265 ])) 1266 1267 # WHEN 1268 actual = body.to_bytes() 1269 1270 # THEN 1271 expected = bytearray([ 1272 0x00, 0x00, 0x00, 0x02, 0x80, 0x00, 0xc7, 0xbf, 0x00, 0x00, 0x00, 0x01, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 1273 0x41, 0x41, 0x41, 0x41 1274 ]) 1275 1276 self.assertEqual(expected, actual) 1277 1278 1279class TestICMPv6EchoBodyFactory(unittest.TestCase): 1280 1281 def test_should_build_ICMPv6EchoBody_when_to_bytes_method_is_called(self): 1282 # GIVEN 1283 identifier = any_identifier() 1284 sequence_number = any_sequence_number() 1285 body_data = any_data() 1286 1287 data = bytearray([(identifier >> 8) & 0xff, identifier & 0xff, 1288 (sequence_number >> 8) & 0xff, sequence_number & 0xff]) + body_data 1289 1290 factory = ICMPv6EchoBodyFactory() 1291 1292 # WHEN 1293 actual = factory.parse(io.BytesIO(data), any_message_info()) 1294 1295 # THEN 1296 self.assertTrue(isinstance(actual, ICMPv6EchoBody)) 1297 1298 self.assertEqual(identifier, actual.identifier) 1299 self.assertEqual(sequence_number, actual.sequence_number) 1300 self.assertEqual(body_data, actual.data) 1301 1302 1303class TestICMPv6DestinationUnreachable(unittest.TestCase): 1304 1305 def test_should_convert_ICMPv6DestinationUnreachable_to_bytearray_when_to_bytes_method_is_called(self): 1306 # GIVEN 1307 data = any_data() 1308 1309 icmpv6_dest_unreachable = ICMPv6DestinationUnreachable(data) 1310 1311 # WHEN 1312 actual_data = icmpv6_dest_unreachable.to_bytes() 1313 1314 # THEN 1315 self.assertEqual(bytearray([0x00, 0x00, 0x00, 0x00]) + data, actual_data) 1316 1317 def test_should_convert_bytearray_to_ICMPv6DestinationUnreachable_when_from_bytes_method_is_called(self): 1318 # GIVEN 1319 data = any_data() 1320 1321 # WHEN 1322 icmpv6_dest_unreachable = ICMPv6DestinationUnreachable.from_bytes( 1323 io.BytesIO(bytearray([0x00, 0x00, 0x00, 0x00]) + data)) 1324 1325 # THEN 1326 self.assertEqual(data, icmpv6_dest_unreachable.data) 1327 1328 def test_should_raise_RuntimeError_when_from_bytes_method_is_called(self): 1329 # GIVEN 1330 data = any_data() 1331 1332 unused = random.randint(1, 1 << 32) 1333 1334 # WHEN 1335 self.assertRaises(RuntimeError, ICMPv6DestinationUnreachable.from_bytes, 1336 io.BytesIO(bytearray(struct.pack(">I", unused)) + data)) 1337 1338 1339class TestICMPv6DestinationUnreachableFactory(unittest.TestCase): 1340 1341 def test_should_create_ICMPv6DestinationUnreachable_when_parse_method_is_called(self): 1342 # GIVEN 1343 icmp_data = any_data() 1344 1345 factory = ICMPv6DestinationUnreachableFactory() 1346 1347 data = bytearray([0x00, 0x00, 0x00, 0x00]) + icmp_data 1348 1349 # WHEN 1350 icmpv6_dest_unreachable = factory.parse(io.BytesIO(data), any_message_info()) 1351 1352 # THEN 1353 self.assertEqual(icmp_data, icmpv6_dest_unreachable.data) 1354 1355 1356class TestUDPHeaderFactory(unittest.TestCase): 1357 1358 def test_should_create_UDPHeader_when_to_bytes_method_is_called(self): 1359 # GIVEN 1360 factory = UDPHeaderFactory() 1361 1362 src_port = any_port() 1363 dst_port = any_port() 1364 payload_length = any_payload_length() 1365 checksum = any_checksum() 1366 1367 data = struct.pack("!H", src_port) + struct.pack("!H", dst_port) + \ 1368 struct.pack("!H", payload_length) + struct.pack("!H", checksum) 1369 1370 # WHEN 1371 udp_header = factory.parse(io.BytesIO(data), any_message_info()) 1372 1373 # THEN 1374 self.assertEqual(src_port, udp_header.src_port) 1375 self.assertEqual(dst_port, udp_header.dst_port) 1376 self.assertEqual(payload_length, udp_header.payload_length) 1377 self.assertEqual(checksum, udp_header.checksum) 1378 1379 1380class TestHopByHopOptionsFactory(unittest.TestCase): 1381 1382 def test_should_create_option_from_bytearray_when_to_bytes_method_is_called(self): 1383 # GIVEN 1384 1385 class DummyOptionFactory: 1386 1387 def parse(self, data, message_info): 1388 return data.read() 1389 1390 factory = HopByHopOptionsFactory(options_factories={2: DummyOptionFactory()}) 1391 1392 data = bytearray([0x02, 0x03, 0x11, 0x22, 0x33, 0x01, 0x00]) 1393 1394 # WHEN 1395 actual_options = factory.parse(io.BytesIO(data), any_message_info()) 1396 1397 # THEN 1398 self.assertEqual(1, len(actual_options)) 1399 self.assertEqual(2, actual_options[0].header.type) 1400 self.assertEqual(3, actual_options[0].header.length) 1401 1402 1403if __name__ == "__main__": 1404 unittest.main() 1405