1#!/usr/bin/env python3
2#
3#  Copyright (c) 2016, The OpenThread Authors.
4#  All rights reserved.
5#
6#  Redistribution and use in source and binary forms, with or without
7#  modification, are permitted provided that the following conditions are met:
8#  1. Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#  2. Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#  3. Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27#  POSSIBILITY OF SUCH DAMAGE.
28#
29
30import io
31import random
32import string
33import struct
34import unittest
35
36from ipaddress import ip_address
37
38from ipv6 import (
39    ICMPv6Header,
40    UDPHeader,
41    IPv6Header,
42    IPv6PacketFactory,
43    UDPDatagram,
44    UDPDatagramFactory,
45    ICMPv6Factory,
46    HopByHopFactory,
47    MPLOptionFactory,
48    ICMPv6,
49    HopByHopOptionHeader,
50    HopByHopOption,
51    HopByHop,
52    MPLOption,
53    IPv6Packet,
54    ICMPv6EchoBody,
55    BytesPayload,
56    ICMPv6EchoBodyFactory,
57    UpperLayerProtocol,
58    UDPHeaderFactory,
59    HopByHopOptionsFactory,
60    ICMPv6DestinationUnreachableFactory,
61    BytesPayloadFactory,
62    ICMPv6DestinationUnreachable,
63    UdpBasedOnSrcDstPortsPayloadFactory,
64    FragmentHeader,
65)
66
67import common
68
69
70class HopByHopOptionBytesValue:
71    """ Test helper class """
72
73    _value = "value"
74
75    def __init__(self, _bytes):
76        self.bytes = _bytes
77
78    def to_bytes(self):
79        return self.bytes
80
81    def to_dict(self, d=None):
82        d = d if d is not None else dict()
83
84        d[self._value] = self.bytes
85        return d
86
87    def __len__(self):
88        return len(self.bytes)
89
90
91class ICMPv6BytesBody:
92    """ Test helper class """
93
94    _icmp_body = "icmp_body"
95
96    def __init__(self, _bytes):
97        self.bytes = _bytes
98
99    def to_bytes(self):
100        return self.bytes
101
102    def to_dict(self, d=None):
103
104        d[self._icmp_body] = self.bytes
105        return d
106
107    def __len__(self):
108        return len(self.bytes)
109
110
111class ICMPv6BytesBodyFactory:
112    """ Test helper class """
113
114    def parse(self, data, context):
115        return ICMPv6BytesBody(data.read())
116
117
118class DummyHeader:
119
120    def __init__(self):
121        self.checksum = 0
122
123
124class DummyUpperLayerProtocol(UpperLayerProtocol):
125
126    def __init__(self, header, data, _type):
127        super(DummyUpperLayerProtocol, self).__init__(header)
128        self._data = data
129        self._type = _type
130
131    @property
132    def type(self):
133        return self._type
134
135    def to_bytes(self):
136        return self._data
137
138    def __len__(self):
139        return len(self._data)
140
141
142def any_uint(bits):
143    return random.randint(0, (1 << bits) - 1)
144
145
146def any_type():
147    return any_uint(8)
148
149
150def any_code():
151    return any_uint(8)
152
153
154def any_checksum():
155    return any_uint(16)
156
157
158def any_fragment_offset():
159    return any_uint(13)
160
161
162def any_bool():
163    return (any_uint(1) == 1)
164
165
166def any_fragment_identification():
167    return any_uint(32)
168
169
170def any_icmp_payload(_type, code, checksum, body):
171    return bytearray([_type, code, (checksum >> 8) & 0xff, checksum & 0xff]) + body
172
173
174def any_udp_payload(src_port, dst_port, payload, checksum):
175    payload_len = len(payload) + 8
176    return bytearray([(src_port >> 8) & 0xff, src_port & 0xff, (dst_port >> 8) & 0xff, dst_port & 0xff,
177                      (payload_len >> 8) & 0xff, payload_len & 0xff,
178                      (checksum >> 8) & 0xff, checksum & 0xff]) + payload
179
180
181def any_hop_by_hop_payload(next_header, hdr_ext_len, payload):
182    return bytearray([next_header, hdr_ext_len]) + payload
183
184
185def any_body():
186    length = any_uint(8)
187    body = "".join([random.choice(string.ascii_letters + string.digits + string.hexdigits) for _ in range(length)])
188    return bytearray(body.encode("utf-8"))
189
190
191def any_payload():
192    length = any_uint(8)
193    payload = "".join([random.choice(string.printable) for _ in range(length)])
194    return bytearray(payload.encode("utf-8"))
195
196
197def any_ip_address():
198    return bytearray([0xfe, 0x80]) + bytearray([0x00] * 6) + bytearray([random.getrandbits(8)] * 8)
199
200
201def any_port():
202    return any_uint(16)
203
204
205def any_mpl_opt_type():
206    return any_uint(8)
207
208
209def any_mpl_opt_data_len():
210    return any_uint(8)
211
212
213def any_mpl_S():
214    return any_uint(2)
215
216
217def any_mpl_M():
218    return any_uint(1)
219
220
221def any_mpl_V():
222    return any_uint(1)
223
224
225def any_mpl_sequence():
226    return any_uint(8)
227
228
229def any_mpl_seed_id(S):
230    length = MPLOption._seed_id_length[S]
231    seed_id = "".join([random.choice(string.ascii_letters + string.digits + string.hexdigits) for _ in range(length)])
232    return bytearray(seed_id.encode("utf-8"))
233
234
235def any_next_header():
236    return any_uint(8)
237
238
239def any_traffic_class():
240    return any_uint(8)
241
242
243def any_flow_label():
244    return any_uint(20)
245
246
247def any_hop_limit():
248    return any_uint(8)
249
250
251def any_payload_length():
252    return any_uint(16)
253
254
255def any_hdr_ext_len():
256    return any_uint(3)
257
258
259def any_length():
260    return any_uint(4)
261
262
263def any_str(length=8):
264    s = "".join(random.choice(string.printable) for _ in range(length))
265    return s.encode("utf-8")
266
267
268def any_bytes(length=4):
269    return bytearray(any_str(length))
270
271
272def any_dict(keys_count=4):
273    keys = [any_str() for _ in range(keys_count)]
274
275    d = {}
276    for key in keys:
277        d[key] = any_bytes()
278
279    return d
280
281
282def any_mpl_option():
283    S = any_mpl_S()
284    M = any_mpl_M()
285    V = any_mpl_V()
286    sequence = any_mpl_sequence()
287    seed_id = any_mpl_seed_id(S)
288
289    return MPLOption(S, M, V, sequence, seed_id)
290
291
292def any_hop_by_hop_bytes_option_header(length=4):
293    _type = any_type()
294
295    # 0 or 1 means padding, so type have to be higher than 1
296    while _type <= 1:
297        _type = any_type()
298
299    return HopByHopOptionHeader(_type, length)
300
301
302def any_hop_by_hop_bytes_value(length=2):
303    return HopByHopOptionBytesValue(any_bytes(length))
304
305
306def any_hop_by_hop_bytes_option():
307    length = any_length()
308    return HopByHopOption(any_hop_by_hop_bytes_option_header(length), any_hop_by_hop_bytes_value(length))
309
310
311def any_hop_by_hop_mpl_option():
312    mpl_option = any_mpl_option()
313    return HopByHopOption(any_hop_by_hop_bytes_option_header(len(mpl_option)), mpl_option)
314
315
316def any_identifier():
317    return any_uint(16)
318
319
320def any_sequence_number():
321    return any_uint(16)
322
323
324def any_data():
325    return any_bytes(random.randint(0, 32))
326
327
328def any_upper_layer_payload(data, _type):
329    return DummyUpperLayerProtocol(DummyHeader(), data, _type)
330
331
332def any_extension_headers():
333    return []
334
335
336def any_message_info():
337    return common.MessageInfo()
338
339
340class TestIPv6Header(unittest.TestCase):
341
342    def test_should_convert_IPv6_header_to_bytes_when_to_bytes_method_is_called(self):
343        # GIVEN
344        traffic_class = any_traffic_class()
345        flow_label = any_flow_label()
346        payload_length = any_payload_length()
347        next_header = any_next_header()
348        hop_limit = any_hop_limit()
349        source_address = any_ip_address()
350        destination_address = any_ip_address()
351
352        ipv6_header = IPv6Header(source_address, destination_address, traffic_class, flow_label, hop_limit,
353                                 payload_length, next_header)
354
355        # WHEN
356        data = ipv6_header.to_bytes()
357
358        # THEN
359        self.assertEqual(6, data[0] >> 4)
360        self.assertEqual(traffic_class, ((data[0] << 8 | data[1]) >> 4) & 0xff)
361        self.assertEqual(flow_label, ((data[1] & 0x0F) << 16) | (data[2] << 8) | data[3])
362        self.assertEqual(payload_length, struct.unpack("!H", data[4:6])[0])
363        self.assertEqual(next_header, data[6])
364        self.assertEqual(hop_limit, data[7])
365        self.assertEqual(source_address, data[8:24])
366        self.assertEqual(destination_address, data[24:40])
367
368    def test_should_create_IPv6Header_when_from_bytes_classmethod_is_called(self):
369        # GIVEN
370        traffic_class = any_traffic_class()
371        flow_label = any_flow_label()
372        payload_length = any_payload_length()
373        next_header = any_next_header()
374        hop_limit = any_hop_limit()
375        source_address = any_ip_address()
376        destination_address = any_ip_address()
377
378        data = bytearray([(6 << 4) | (traffic_class >> 4), (traffic_class & 0xF) << 4 | (flow_label >> 16) & 0xF,
379                          (flow_label >> 8) & 0xff, flow_label & 0xff, payload_length >> 8, payload_length & 0xff,
380                          next_header, hop_limit])
381        data += ip_address(bytes(source_address)).packed + ip_address(bytes(destination_address)).packed
382
383        # WHEN
384        ipv6_header = IPv6Header.from_bytes(io.BytesIO(data))
385
386        # THEN
387        self.assertEqual(6, ipv6_header.version)
388        self.assertEqual(traffic_class, ipv6_header.traffic_class)
389        self.assertEqual(flow_label, ipv6_header.flow_label)
390        self.assertEqual(payload_length, ipv6_header.payload_length)
391        self.assertEqual(next_header, ipv6_header.next_header)
392        self.assertEqual(hop_limit, ipv6_header.hop_limit)
393        self.assertEqual(source_address, ipv6_header.source_address.packed)
394        self.assertEqual(destination_address, ipv6_header.destination_address.packed)
395
396    def test_should_return_proper_header_length_when_IPv6Packet_object_is_called_in_len(self):
397        # GIVEN
398        ipv6_header = IPv6Header(any_traffic_class(), any_flow_label(), any_payload_length(), any_next_header(),
399                                 any_hop_limit(), any_ip_address(), any_ip_address())
400
401        # WHEN
402        ipv6_header_length = len(ipv6_header)
403
404        # THEN
405        self.assertEqual(40, ipv6_header_length)
406
407
408class TestUDPHeader(unittest.TestCase):
409
410    def test_should_convert_UDP_header_to_bytes_when_to_bytes_method_is_called(self):
411        # GIVEN
412        src_port = any_port()
413        dst_port = any_port()
414        payload_length = any_payload_length()
415        checksum = any_checksum()
416
417        udp_header = UDPHeader(src_port, dst_port, payload_length, checksum)
418
419        # WHEN
420        data = udp_header.to_bytes()
421
422        # THEN
423        self.assertEqual(src_port, struct.unpack("!H", data[0:2])[0])
424        self.assertEqual(dst_port, struct.unpack("!H", data[2:4])[0])
425        self.assertEqual(payload_length, struct.unpack("!H", data[4:6])[0])
426        self.assertEqual(checksum, struct.unpack("!H", data[6:])[0])
427
428    def test_should_create_UDPHeader_when_from_bytes_classmethod_is_called(self):
429        # GIVEN
430        src_port = any_port()
431        dst_port = any_port()
432        payload_length = any_payload_length()
433        checksum = any_checksum()
434
435        data = struct.pack("!H", src_port) + struct.pack("!H", dst_port) + \
436            struct.pack("!H", payload_length) + struct.pack("!H", checksum)
437
438        # WHEN
439        udp_header = UDPHeader.from_bytes(io.BytesIO(data))
440
441        # THEN
442        self.assertEqual(src_port, udp_header.src_port)
443        self.assertEqual(dst_port, udp_header.dst_port)
444        self.assertEqual(payload_length, udp_header.payload_length)
445        self.assertEqual(checksum, udp_header.checksum)
446
447    def test_should_return_proper_header_length_when_UDPHeader_object_is_called_in_len(self):
448        # GIVEN
449        udp_header = UDPHeader(any_port(), any_port(), any_payload_length(), any_checksum())
450
451        # WHEN
452        udp_header_length = len(udp_header)
453
454        # THEN
455        self.assertEqual(8, udp_header_length)
456
457    def test_should_return_17_when_type_property_is_called(self):
458        # GIVEN
459        udp_header = UDPHeader(any_port(), any_port(), any_payload_length(), any_checksum())
460
461        # THEN
462        self.assertEqual(17, udp_header.type)
463
464
465class TestICMPv6Header(unittest.TestCase):
466
467    def test_should_convert_icmp_message_header_to_bytes_when_to_bytes_method_is_called(self):
468        # GIVEN
469        _type = any_type()
470        code = any_code()
471        checksum = any_checksum()
472
473        icmpv6_header = ICMPv6Header(_type, code, checksum)
474
475        # WHEN
476        data = icmpv6_header.to_bytes()
477
478        # THEN
479        self.assertEqual(_type, data[0])
480        self.assertEqual(code, data[1])
481        self.assertEqual(checksum, struct.unpack("!H", data[2:])[0])
482
483    def test_should_create_ICMPv6Header_when_to_bytes_classmethod_is_called(self):
484        # GIVEN
485        _type = any_type()
486        code = any_code()
487        checksum = any_checksum()
488
489        data = bytearray([_type, code]) + struct.pack("!H", checksum)
490
491        # WHEN
492        icmpv6_header = ICMPv6Header.from_bytes(io.BytesIO(data))
493
494        # THEN
495        self.assertEqual(_type, icmpv6_header.type)
496        self.assertEqual(code, icmpv6_header.code)
497        self.assertEqual(checksum, icmpv6_header.checksum)
498
499    def test_should_return_proper_header_length_when_ICMPv6Header_object_is_called_in_len(self):
500        # GIVEN
501        icmpv6_header = ICMPv6Header(any_type(), any_code(), any_checksum())
502
503        # WHEN
504        icmpv6_header_length = len(icmpv6_header)
505
506        # THEN
507        self.assertEqual(4, icmpv6_header_length)
508
509
510class TestIPv6Packet(unittest.TestCase):
511
512    def test_should_build_IPv6Packet_with_ICMP_payload_from_well_know_values_when_to_bytes_method_is_called(self):
513        # GIVEN
514
515        ipv6_packet = IPv6Packet(
516            IPv6Header(source_address="fd00:1234:4555::ff:fe00:1800", destination_address="ff03::1"),
517            ICMPv6(
518                ICMPv6Header(128, 0),
519                ICMPv6EchoBody(
520                    0, 2,
521                    bytearray([
522                        0x80, 0x00, 0xc7, 0xbf, 0x00, 0x00, 0x00, 0x01, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41,
523                        0x41, 0x41
524                    ]))), [
525                        HopByHop(options=[
526                            HopByHopOption(HopByHopOptionHeader(
527                                _type=0x6d), MPLOption(S=1, M=0, V=0, sequence=2, seed_id=bytearray([0x00, 0x18])))
528                        ])
529                    ])
530
531        # WHEN
532        ipv6_packet_bytes = ipv6_packet.to_bytes()
533
534        # THEN
535        expected_ipv6_packet_bytes = bytearray([
536            0x60, 0x00, 0x00, 0x00, 0x00, 0x22, 0x00, 0x40, 0xfd, 0x00, 0x12, 0x34, 0x45, 0x55, 0x00, 0x00, 0x00, 0x00,
537            0x00, 0xff, 0xfe, 0x00, 0x18, 0x00, 0xff, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
538            0x00, 0x00, 0x00, 0x01, 0x3a, 0x00, 0x6d, 0x04, 0x40, 0x02, 0x00, 0x18, 0x80, 0x00, 0x87, 0x12, 0x00, 0x00,
539            0x00, 0x02, 0x80, 0x00, 0xc7, 0xbf, 0x00, 0x00, 0x00, 0x01, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41,
540            0x41, 0x41
541        ])
542
543        self.assertEqual(expected_ipv6_packet_bytes, ipv6_packet_bytes)
544
545    def test_should_build_IPv6Packet_with_UDP_payload_from_well_know_values_when_to_bytes_method_is_called(self):
546        # GIVEN
547        ipv6_header = IPv6Header(source_address="fe80::1", destination_address="ff02::2", hop_limit=255)
548
549        udp_dgram = UDPDatagram(
550            UDPHeader(src_port=19788, dst_port=19788),
551            BytesPayload(
552                bytearray([
553                    0x00, 0x15, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x09, 0x01, 0x01, 0x0b, 0x03,
554                    0x04, 0xc6, 0x69, 0x73, 0x51, 0x0e, 0x01, 0x80, 0x12, 0x02, 0x00, 0x01, 0xde, 0xad, 0xbe, 0xef
555                ])))
556
557        ipv6_packet = IPv6Packet(ipv6_header, udp_dgram)
558
559        # WHEN
560        ipv6_packet_bytes = ipv6_packet.to_bytes()
561
562        # THEN
563        expected_ipv6_packet_bytes = bytearray([
564            0x60, 0x00, 0x00, 0x00, 0x00, 0x28, 0x11, 0xff, 0xfe, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
565            0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0xff, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
566            0x00, 0x00, 0x00, 0x02, 0x4d, 0x4c, 0x4d, 0x4c, 0x00, 0x28, 0xe9, 0xf4, 0x00, 0x15, 0x00, 0x00, 0x00, 0x00,
567            0x00, 0x00, 0x00, 0x00, 0x01, 0x09, 0x01, 0x01, 0x0b, 0x03, 0x04, 0xc6, 0x69, 0x73, 0x51, 0x0e, 0x01, 0x80,
568            0x12, 0x02, 0x00, 0x01, 0xde, 0xad, 0xbe, 0xef
569        ])
570
571        self.assertEqual(expected_ipv6_packet_bytes, ipv6_packet_bytes)
572
573
574class TestIPv6PacketFactory(unittest.TestCase):
575
576    def test_should_create_IPv6Packet_with_MPL_and_ICMP_when_to_bytes_method_is_called(self):
577        # GIVEN
578        ipv6_packet_bytes = bytearray([
579            0x60, 0x00, 0x00, 0x00, 0x00, 0x22, 0x00, 0x40, 0xfd, 0x00, 0x12, 0x34, 0x45, 0x55, 0x00, 0x00, 0x00, 0x00,
580            0x00, 0xff, 0xfe, 0x00, 0x18, 0x00, 0xff, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
581            0x00, 0x00, 0x00, 0x01, 0x3a, 0x00, 0x6d, 0x04, 0x40, 0x02, 0x00, 0x18, 0x80, 0x00, 0x87, 0x12, 0x00, 0x00,
582            0x00, 0x02, 0x80, 0x00, 0xc7, 0xbf, 0x00, 0x00, 0x00, 0x01, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41,
583            0x41, 0x41
584        ])
585
586        ipv6_factory = IPv6PacketFactory(ehf={
587            0:
588                HopByHopFactory(hop_by_hop_options_factory=HopByHopOptionsFactory(
589                    options_factories={109: MPLOptionFactory()}))
590        },
591                                         ulpf={58: ICMPv6Factory(body_factories={128: ICMPv6EchoBodyFactory()})})
592
593        # WHEN
594        ipv6_packet = ipv6_factory.parse(io.BytesIO(ipv6_packet_bytes), any_message_info())
595
596        # THEN
597        self.assertEqual('fd00:1234:4555::ff:fe00:1800', ipv6_packet.ipv6_header.source_address.compressed)
598        self.assertEqual('ff03::1', ipv6_packet.ipv6_header.destination_address.compressed)
599        self.assertEqual(64, ipv6_packet.ipv6_header.hop_limit)
600        self.assertEqual(0, ipv6_packet.ipv6_header.next_header)
601        self.assertEqual(34, ipv6_packet.ipv6_header.payload_length)
602        self.assertEqual(0, ipv6_packet.ipv6_header.flow_label)
603        self.assertEqual(0, ipv6_packet.ipv6_header.traffic_class)
604        self.assertEqual(6, ipv6_packet.ipv6_header.version)
605
606        self.assertEqual(1, ipv6_packet.extension_headers[0].options[0].value.S)
607        self.assertEqual(0, ipv6_packet.extension_headers[0].options[0].value.M)
608        self.assertEqual(0, ipv6_packet.extension_headers[0].options[0].value.V)
609        self.assertEqual(2, ipv6_packet.extension_headers[0].options[0].value.sequence)
610        self.assertEqual(bytearray([0x00, 0x18]), ipv6_packet.extension_headers[0].options[0].value.seed_id)
611
612        self.assertEqual(34578, ipv6_packet.upper_layer_protocol.header.checksum)
613        self.assertEqual(128, ipv6_packet.upper_layer_protocol.header.type)
614        self.assertEqual(0, ipv6_packet.upper_layer_protocol.header.code)
615        self.assertEqual(0, ipv6_packet.upper_layer_protocol.body.identifier)
616        self.assertEqual(2, ipv6_packet.upper_layer_protocol.body.sequence_number)
617        self.assertEqual(b'\x80\x00\xc7\xbf\x00\x00\x00\x01AAAAAAAAAA', ipv6_packet.upper_layer_protocol.body.data)
618
619    def test_should_create_IPv6Packet_without_any_extension_header_with_ICMP_when_to_bytes_method_is_called(self):
620        # GIVEN
621        ipv6_packet_bytes = bytearray([
622            0x60, 0x00, 0x00, 0x00, 0x00, 0x1A, 0x3A, 0x40, 0xfd, 0x00, 0x12, 0x34, 0x45, 0x55, 0x00, 0x00, 0x00, 0x00,
623            0x00, 0xff, 0xfe, 0x00, 0x18, 0x00, 0xff, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
624            0x00, 0x00, 0x00, 0x01, 0x80, 0x00, 0x87, 0x12, 0x00, 0x00, 0x00, 0x02, 0x80, 0x00, 0xc7, 0xbf, 0x00, 0x00,
625            0x00, 0x01, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41
626        ])
627
628        ipv6_factory = IPv6PacketFactory(ulpf={58: ICMPv6Factory(body_factories={128: ICMPv6EchoBodyFactory()})})
629
630        # WHEN
631        ipv6_packet = ipv6_factory.parse(io.BytesIO(ipv6_packet_bytes), any_message_info())
632        ipv6_packet._validate_checksum()
633
634        # THEN
635        self.assertEqual('fd00:1234:4555::ff:fe00:1800', ipv6_packet.ipv6_header.source_address.compressed)
636        self.assertEqual('ff03::1', ipv6_packet.ipv6_header.destination_address.compressed)
637        self.assertEqual(64, ipv6_packet.ipv6_header.hop_limit)
638        self.assertEqual(58, ipv6_packet.ipv6_header.next_header)
639        self.assertEqual(26, ipv6_packet.ipv6_header.payload_length)
640        self.assertEqual(0, ipv6_packet.ipv6_header.flow_label)
641        self.assertEqual(0, ipv6_packet.ipv6_header.traffic_class)
642        self.assertEqual(6, ipv6_packet.ipv6_header.version)
643
644        self.assertEqual(34578, ipv6_packet.upper_layer_protocol.header.checksum)
645        self.assertEqual(128, ipv6_packet.upper_layer_protocol.header.type)
646        self.assertEqual(0, ipv6_packet.upper_layer_protocol.header.code)
647        self.assertEqual(0, ipv6_packet.upper_layer_protocol.body.identifier)
648        self.assertEqual(2, ipv6_packet.upper_layer_protocol.body.sequence_number)
649        self.assertEqual(b'\x80\x00\xc7\xbf\x00\x00\x00\x01AAAAAAAAAA', ipv6_packet.upper_layer_protocol.body.data)
650
651    def test_should_set_message_info_field_when_to_bytes_method_is_called(self):
652        # GIVEN
653        ipv6_packet_data = bytearray([
654            0x60, 0x00, 0x00, 0x00, 0x00, 0x1A, 0x3A, 0x40, 0xfd, 0x00, 0x12, 0x34, 0x45, 0x55, 0x00, 0x00, 0x00, 0x00,
655            0x00, 0xff, 0xfe, 0x00, 0x18, 0x00, 0xff, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
656            0x00, 0x00, 0x00, 0x01, 0x80, 0x00, 0x87, 0x12, 0x00, 0x00, 0x00, 0x02, 0x80, 0x00, 0xc7, 0xbf, 0x00, 0x00,
657            0x00, 0x01, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41
658        ])
659
660        message_info = any_message_info()
661        message_info.source_ipv6 = "ff::"
662        message_info.destination_address = "ff::"
663
664        factory = IPv6PacketFactory(ulpf={58: ICMPv6Factory(body_factories={128: ICMPv6EchoBodyFactory()})})
665
666        # WHEN
667        factory.parse(io.BytesIO(ipv6_packet_data), message_info)
668
669        # THEN
670        self.assertEqual("fd00:1234:4555::ff:fe00:1800", message_info.source_ipv6.compressed)
671        self.assertEqual("ff03::1", message_info.destination_ipv6.compressed)
672
673
674class TestUDPDatagram(unittest.TestCase):
675
676    def test_should_creates_bytes_from_UDPHeader_and_payload_when_to_bytes_method_is_called(self):
677        # GIVEN
678        src_port = any_port()
679        dst_port = any_port()
680        checksum = any_checksum()
681
682        payload = any_payload()
683        payload_length = len(payload) + 8  # UDP length consists of UDP header length and payload length
684
685        udp_header = UDPHeader(src_port, dst_port, payload_length, checksum)
686        udp_payload = BytesPayload(payload)
687        udp_dgram = UDPDatagram(udp_header, udp_payload)
688
689        # WHEN
690        udp_dgram_bytes = udp_dgram.to_bytes()
691
692        # THEN
693        expected_udp_dgram_bytes = struct.pack("!H", src_port) + struct.pack("!H", dst_port) + \
694            struct.pack("!H", payload_length) + struct.pack("!H", checksum) + payload
695
696        self.assertEqual(expected_udp_dgram_bytes, udp_dgram_bytes)
697
698
699class TestIPv6FragmentHeader(unittest.TestCase):
700
701    def test_shold_convert_IPv6_fragment_header_to_bytes_when_to_bytes_method_is_called(self):
702        # GIVEN
703        type = any_type()
704        offset = any_fragment_offset()
705        more_flag = any_bool()
706        identification = any_fragment_identification()
707
708        ipv6_fragment_header = FragmentHeader(type, offset, more_flag, identification)
709
710        # WHEN
711        actual = ipv6_fragment_header.to_bytes()
712
713        # THEN
714        expected = bytearray([type, 0x00, offset >> 5, ((offset << 3) & 0xff) | more_flag])\
715            + struct.pack("!I", identification)
716
717        self.assertEqual(expected, actual)
718
719    def test_should_create_FragmentHeader_when_from_bytes_classmethod_is_called(self):
720        # GIVEN
721        type = any_type()
722        offset = any_fragment_offset()
723        more_flag = any_bool()
724        identification = any_fragment_identification()
725
726        data = bytearray([type, 0x00, offset >> 5, ((offset << 3) & 0xff) | more_flag])\
727            + struct.pack("!I", identification)
728
729        # WHEN
730        ipv6_fragment_header = FragmentHeader.from_bytes(io.BytesIO(data))
731
732        # THEN
733        self.assertEqual(type, ipv6_fragment_header.next_header)
734        self.assertEqual(offset, ipv6_fragment_header.offset)
735        self.assertEqual(more_flag, ipv6_fragment_header.more_flag)
736        self.assertEqual(identification, ipv6_fragment_header.identification)
737
738
739class TestICMPv6(unittest.TestCase):
740
741    def test_should_creates_bytes_from_ICMPv6Header_and_body_when_to_bytes_method_is_called(self):
742        # GIVEN
743        _type = any_type()
744        code = any_code()
745        checksum = any_checksum()
746        body = any_body()
747
748        icmpv6_header = ICMPv6Header(_type, code, checksum)
749        icmpv6_body = ICMPv6BytesBody(body)
750        icmpv6_msg = ICMPv6(icmpv6_header, icmpv6_body)
751
752        # WHEN
753        actual = icmpv6_msg.to_bytes()
754
755        # THEN
756        expected = bytearray([_type, code]) + struct.pack("!H", checksum) + body
757
758        self.assertEqual(expected, actual)
759
760
761class TestHopByHop(unittest.TestCase):
762
763    def _calculate_hdr_ext_len(self, payload_len):
764        count = payload_len // 8
765        rest = payload_len % 8
766
767        if rest != 0:
768            count += 1
769
770        if count == 0 and rest == 0:
771            return count
772
773        return count - 1
774
775    def _calculate_required_padding(self, content_length):
776        excess_bytes = content_length & 0x7
777
778        if excess_bytes > 0:
779            return 8 - excess_bytes
780
781        return 0
782
783    def create_padding(self, padding_length):
784        if padding_length == 1:
785            return bytearray([0x00])
786        elif padding_length > 1:
787            padding_length -= 2
788            return bytearray([0x01, padding_length]) + bytearray([0x00 for _ in range(padding_length)])
789        else:
790            return bytearray()
791
792    def test_should_create_bytes_from_HopByHop_when_to_bytes_method_is_called(self):
793        # GIVEN
794        next_header = any_next_header()
795        hop_by_hop_option = any_hop_by_hop_bytes_option()
796        hdr_ext_len = self._calculate_hdr_ext_len(2 + len(hop_by_hop_option))
797
798        hop_by_hop = HopByHop(next_header, [hop_by_hop_option])
799
800        # WHEN
801        data = hop_by_hop.to_bytes()
802
803        # THEN
804        expected_data = bytearray([next_header, hdr_ext_len]) + hop_by_hop_option.to_bytes()
805        padding_length = self._calculate_required_padding(len(expected_data))
806        expected_data += self.create_padding(padding_length)
807
808        self.assertEqual(expected_data, data)
809
810
811class TestMPLOption(unittest.TestCase):
812
813    def test_should_convert_MPLOption_to_bytes_when_to_bytes_method_is_called(self):
814        # GIVEN
815        S = any_mpl_S()
816        M = any_mpl_M()
817        V = any_mpl_V()
818        sequence = any_mpl_sequence()
819        seed_id = any_mpl_seed_id(S)
820
821        mpl_option = MPLOption(S, M, V, sequence, seed_id)
822
823        # WHEN
824        data = mpl_option.to_bytes()
825
826        # THEN
827        expected_data = bytearray([(S << 6) | (M << 5) | (V << 4), sequence]) + seed_id
828        self.assertEqual(expected_data, data)
829
830    def test_should_create_MPLOption_when_to_bytes_method_is_called_with_data(self):
831        # GIVEN
832        S = any_mpl_S()
833        M = any_mpl_M()
834        V = any_mpl_V()
835        sequence = any_mpl_sequence()
836        seed_id = any_mpl_seed_id(S)
837
838        data = bytearray([(S << 6) | (M << 5) | (V << 4), sequence]) + seed_id
839
840        # WHEN
841        mpl_option = MPLOption.from_bytes(io.BytesIO(data))
842
843        # THEN
844        self.assertEqual(S, mpl_option.S)
845        self.assertEqual(M, mpl_option.M)
846        self.assertEqual(V, mpl_option.V)
847        self.assertEqual(sequence, mpl_option.sequence)
848        self.assertEqual(seed_id, mpl_option.seed_id)
849
850    def test_check_if_mpl_seed_id_length_values_was_not_changed(self):
851        self.assertEqual(0, MPLOption._seed_id_length[0])
852        self.assertEqual(2, MPLOption._seed_id_length[1])
853        self.assertEqual(8, MPLOption._seed_id_length[2])
854        self.assertEqual(16, MPLOption._seed_id_length[3])
855
856    def test_should_return_proper_length_when_len_is_called_with_mpl_option_object(self):
857        # GIVEN
858        S = any_mpl_S()
859        M = any_mpl_M()
860        V = any_mpl_V()
861        sequence = any_mpl_sequence()
862        seed_id = any_mpl_seed_id(S)
863
864        mpl_option = MPLOption(S, M, V, sequence, seed_id)
865
866        # WHEN
867        mpl_option_length = len(mpl_option)
868
869        # THEN
870        SMV_and_sequence_length = 2
871        self.assertEqual(SMV_and_sequence_length + len(seed_id), mpl_option_length)
872
873
874class TestclassHopByHopOption(unittest.TestCase):
875
876    def test_should_convert_HopByHopOption_to_bytes_when_to_bytes_method_is_called(self):
877        # GIVEN
878        length = any_length()
879        header = any_hop_by_hop_bytes_option_header(length)
880        value = any_hop_by_hop_bytes_value(length)
881
882        hop_by_hop_option = HopByHopOption(header, value)
883
884        # WHEN
885        data = hop_by_hop_option.to_bytes()
886
887        # THEN
888        expected_data = header.to_bytes() + value.to_bytes()
889        self.assertEqual(expected_data, data)
890
891    def test_should_return_length_of_HopByHopOption_when_len_is_called_with_hop_by_hop_option_object(self):
892        # GIVEN
893        length = any_length()
894        header = any_hop_by_hop_bytes_option_header(length)
895        value = any_hop_by_hop_bytes_value(length)
896
897        hop_by_hop_option = HopByHopOption(header, value)
898
899        # WHEN
900        hop_by_hop_option_length = len(hop_by_hop_option)
901
902        # THEN
903        header_length = 2
904        expected_hop_by_hop_option_length = header_length + length
905        self.assertEqual(expected_hop_by_hop_option_length, hop_by_hop_option_length)
906
907
908class TestHopByHopOptionHeader(unittest.TestCase):
909
910    def test_should_convert_HopByHopOptionHeader_to_bytes_when_to_bytes_method_is_called(self):
911        # GIVEN
912        _type = any_type()
913        length = any_length()
914
915        hop_by_hop_option_header = HopByHopOptionHeader(_type, length)
916
917        # WHEN
918        data = hop_by_hop_option_header.to_bytes()
919
920        # THEN
921        expected_data = bytearray([_type, length])
922        self.assertEqual(expected_data, data)
923
924    def test_should_create_HopByHopOptionHeader_when_to_bytes_method_is_called_with_data(self):
925        # GIVEN
926        _type = any_type()
927        length = any_length()
928
929        data = bytearray([_type, length])
930
931        # WHEN
932        option_header = HopByHopOptionHeader.from_bytes(io.BytesIO(data))
933
934        # THEN
935        self.assertEqual(_type, option_header.type)
936        self.assertEqual(length, option_header.length)
937
938    def test_should_return_proper_length_when_len_is_called_with_HopByHopOptionHeader_object(self):
939        # GIVEN
940        _type = any_type()
941        length = any_length()
942
943        option_header = HopByHopOptionHeader(_type, length)
944
945        # WHEN
946        option_header_length = len(option_header)
947
948        # THEN
949        expected_option_header_length = 2
950        self.assertEqual(expected_option_header_length, option_header_length)
951
952
953class TestHopByHopFactory(unittest.TestCase):
954
955    def _calculate_hdr_ext_len(self, payload_length):
956        count = payload_length >> 3
957
958        if (payload_length & 0x7) == 0 and count > 0:
959            return count - 1
960
961        return count
962
963    def padding(self, content_length):
964        excess_bytes = content_length & 0x7
965
966        if excess_bytes > 0:
967            padding_length = 8 - excess_bytes
968
969            if padding_length == 1:
970                return bytearray([0x00])
971            elif padding_length > 1:
972                padding_length -= 2
973                return bytearray([0x01, padding_length]) + bytearray([0x00 for _ in range(padding_length)])
974
975        return bytearray()
976
977    def test_should_create_HopByHop_object_instance_when_to_bytes_method_is_called_with_data(self):
978        # GIVEN
979        hop_by_hop_option = any_hop_by_hop_mpl_option()
980        hop_by_hop_option_type = hop_by_hop_option.header.type
981
982        next_header = any_next_header()
983        hdr_ext_len = self._calculate_hdr_ext_len(2 + len(hop_by_hop_option))
984
985        hop_by_hop_factory = HopByHopFactory(hop_by_hop_options_factory=HopByHopOptionsFactory(
986            options_factories={hop_by_hop_option_type: MPLOptionFactory()}))
987
988        data = bytearray([next_header, hdr_ext_len]) + hop_by_hop_option.to_bytes()
989        data += self.padding(len(data))
990
991        # WHEN
992        hop_by_hop = hop_by_hop_factory.parse(io.BytesIO(data), any_message_info())
993
994        # THEN
995        self.assertEqual(hop_by_hop_option.value.S, hop_by_hop.options[0].value.S)
996        self.assertEqual(hop_by_hop_option.value.V, hop_by_hop.options[0].value.V)
997        self.assertEqual(hop_by_hop_option.value.M, hop_by_hop.options[0].value.M)
998        self.assertEqual(hop_by_hop_option.value.sequence, hop_by_hop.options[0].value.sequence)
999        self.assertEqual(hop_by_hop_option.value.seed_id, hop_by_hop.options[0].value.seed_id)
1000
1001    def test_should_raise_RuntimeError_when_no_option_factory_is_set_and_parse_method_is_called(self):
1002        # GIVEN
1003        hop_by_hop_option = any_hop_by_hop_mpl_option()
1004
1005        next_header = any_next_header()
1006        hdr_ext_len = self._calculate_hdr_ext_len(2 + len(hop_by_hop_option))
1007
1008        hop_by_hop_factory = HopByHopFactory(hop_by_hop_options_factory=HopByHopOptionsFactory())
1009
1010        data = bytes([next_header, hdr_ext_len]) + hop_by_hop_option.to_bytes()
1011        data += self.padding(len(data))
1012
1013        # THEN
1014        self.assertRaises(RuntimeError, hop_by_hop_factory.parse, io.BytesIO(data), any_message_info())
1015
1016
1017class TestMPLOptionFactory(unittest.TestCase):
1018
1019    def test_should_produce_MPLOption_from_bytes_when_to_bytes_method_is_called_with_data(self):
1020        # GIVEN
1021        S = any_mpl_S()
1022        M = any_mpl_M()
1023        V = any_mpl_V()
1024        sequence = any_mpl_sequence()
1025        seed_id = any_mpl_seed_id(S)
1026
1027        SMV = (S << 6) | (M << 5) | (V << 4)
1028        data = bytearray([SMV, sequence]) + seed_id
1029
1030        factory = MPLOptionFactory()
1031
1032        # WHEN
1033        mpl_opt = factory.parse(io.BytesIO(data), any_message_info())
1034
1035        # THEN
1036        self.assertEqual(mpl_opt.S, S)
1037        self.assertEqual(mpl_opt.M, M)
1038        self.assertEqual(mpl_opt.V, V)
1039        self.assertEqual(mpl_opt.sequence, sequence)
1040        self.assertEqual(mpl_opt.seed_id, seed_id)
1041
1042
1043class TestUdpBasedOnSrcDstPortsPayloadFactory(unittest.TestCase):
1044
1045    def test_should_create_payload_from_data_when_src_port_factory_is_defined_and_parse_method_is_called(self):
1046        # GIVEN
1047        data = any_data()
1048
1049        message_info = common.MessageInfo()
1050        message_info.src_port = any_port()
1051        message_info.dst_port = any_port()
1052
1053        factory = UdpBasedOnSrcDstPortsPayloadFactory(
1054            src_dst_port_based_payload_factories={message_info.src_port: BytesPayloadFactory()})
1055
1056        # WHEN
1057        actual_data = factory.parse(io.BytesIO(data), message_info)
1058
1059        # THEN
1060        self.assertEqual(data, actual_data.data)
1061
1062    def test_should_create_payload_from_data_when_dst_port_factory_is_defined_and_parse_method_is_called(self):
1063        # GIVEN
1064        data = any_data()
1065
1066        message_info = common.MessageInfo()
1067        message_info.src_port = any_port()
1068        message_info.dst_port = any_port()
1069
1070        factory = UdpBasedOnSrcDstPortsPayloadFactory(
1071            src_dst_port_based_payload_factories={message_info.dst_port: BytesPayloadFactory()})
1072
1073        # WHEN
1074        actual_data = factory.parse(io.BytesIO(data), message_info)
1075
1076        # THEN
1077        self.assertEqual(data, actual_data.data)
1078
1079    def test_should_raise_RuntimeError_when_parse_method_is_called_but_required_factory_is_not_defined(self):
1080        # GIVEN
1081        data = any_data()
1082
1083        message_info = common.MessageInfo()
1084        message_info.src_port = any_port()
1085        message_info.dst_port = any_port()
1086
1087        factory = UdpBasedOnSrcDstPortsPayloadFactory(src_dst_port_based_payload_factories={})
1088
1089        # THEN
1090        self.assertRaises(RuntimeError, factory.parse, io.BytesIO(data), message_info)
1091
1092
1093class TestUDPDatagramFactory(unittest.TestCase):
1094
1095    def test_should_produce_UDPDatagram_from_bytes_when_to_bytes_method_is_called_with_data(self):
1096        # GIVEN
1097        src_port = any_port()
1098        dst_port = any_port()
1099        checksum = any_checksum()
1100
1101        payload = any_payload()
1102        payload_length = len(payload) + len(UDPHeader(0, 0))
1103
1104        data = bytearray([(src_port >> 8), (src_port & 0xff), (dst_port >> 8),
1105                          (dst_port & 0xff), (payload_length >> 8), (payload_length & 0xff), (checksum >> 8),
1106                          (checksum & 0xff)]) + payload
1107
1108        factory = UDPDatagramFactory(UDPHeaderFactory(), BytesPayloadFactory())
1109
1110        # WHEN
1111        udp_dgram = factory.parse(io.BytesIO(data), any_message_info())
1112
1113        # THEN
1114        self.assertEqual(udp_dgram.header.src_port, src_port)
1115        self.assertEqual(udp_dgram.header.dst_port, dst_port)
1116        self.assertEqual(udp_dgram.header.payload_length, payload_length)
1117        self.assertEqual(udp_dgram.header.checksum, checksum)
1118        self.assertEqual(udp_dgram.payload.data, payload)
1119
1120    def test_should_set_src_and_dst_port_in_message_info_when_parse_method_is_called(self):
1121        # GIVEN
1122        message_info = any_message_info()
1123
1124        src_port = any_port()
1125        dst_port = any_port()
1126        checksum = any_checksum()
1127
1128        payload = any_payload()
1129        payload_length = len(payload) + len(UDPHeader(0, 0))
1130
1131        data = (bytearray([
1132            (src_port >> 8),
1133            (src_port & 0xff),
1134            (dst_port >> 8),
1135            (dst_port & 0xff),
1136            (payload_length >> 8),
1137            (payload_length & 0xff),
1138            (checksum >> 8),
1139            (checksum & 0xff),
1140        ]) + payload)
1141
1142        factory = UDPDatagramFactory(UDPHeaderFactory(), BytesPayloadFactory())
1143
1144        # WHEN
1145        factory.parse(io.BytesIO(data), message_info)
1146
1147        # THEN
1148        self.assertEqual(src_port, message_info.src_port)
1149        self.assertEqual(dst_port, message_info.dst_port)
1150
1151
1152class TestICMPv6Factory(unittest.TestCase):
1153
1154    def test_should_produce_ICMPv6_from_bytes_when_to_bytes_method_is_called_with_data(self):
1155        # GIVEN
1156        _type = any_type()
1157        code = any_code()
1158        checksum = any_checksum()
1159        body = any_body()
1160
1161        data = bytearray([_type, code, (checksum >> 8), (checksum & 0xff)]) + body
1162
1163        factory = ICMPv6Factory(body_factories={_type: ICMPv6BytesBodyFactory()})
1164
1165        # WHEN
1166        icmpv6_msg = factory.parse(io.BytesIO(data), any_message_info())
1167
1168        # THEN
1169        self.assertEqual(icmpv6_msg.header.type, _type)
1170        self.assertEqual(icmpv6_msg.header.code, code)
1171        self.assertEqual(icmpv6_msg.header.checksum, checksum)
1172        self.assertEqual(icmpv6_msg.body.bytes, body)
1173
1174    def test_should_raise_RuntimeError_when_method_parse_is_called_but_body_factory_is_not_present(self):
1175        # GIVEN
1176        _type = any_type()
1177        code = any_code()
1178        checksum = any_checksum()
1179        body = any_body()
1180
1181        data = bytes([_type, code, (checksum >> 8), (checksum & 0xff)]) + body
1182
1183        factory = ICMPv6Factory()
1184
1185        # WHEN
1186        self.assertRaises(RuntimeError, factory.parse, io.BytesIO(data), any_message_info())
1187
1188
1189class TestBytesPayload(unittest.TestCase):
1190
1191    def test_should_create_BytesPayload_when_from_bytes_class_method_is_called(self):
1192        # GIVEN
1193        data = any_data()
1194
1195        # WHEN
1196        actual = BytesPayload.from_bytes(data)
1197
1198        # THEN
1199        self.assertEqual(data, actual.data)
1200
1201    def test_should_return_exactly_the_same_data_as_passed_to_constructor_when_to_bytes_method_is_called(self):
1202        # GIVEN
1203        data = any_data()
1204        payload = BytesPayload(data)
1205
1206        # WHEN
1207        actual = payload.to_bytes()
1208
1209        # THEN
1210        self.assertEqual(data, actual)
1211
1212    def test_should_return_the_same_len_as_data_passed_to_constructor_when_len_is_called_on_BytesPayload_object(self):
1213        # GIVEN
1214        data = any_data()
1215        payload = BytesPayload(data)
1216
1217        # WHEN
1218        actual = len(payload)
1219
1220        # THEN
1221        self.assertEqual(len(data), actual)
1222
1223
1224class TestICMPv6EchoBody(unittest.TestCase):
1225
1226    def test_convert_ICMPv6_echo_body_to_data_when_to_bytes_method_is_called(self):
1227        # GIVEN
1228        identifier = any_identifier()
1229        sequence_number = any_sequence_number()
1230        data = any_data()
1231
1232        body = ICMPv6EchoBody(identifier, sequence_number, data)
1233
1234        # WHEN
1235        actual = body.to_bytes()
1236
1237        # THEN
1238        expected = bytearray([identifier >> 8, identifier & 0xff, sequence_number >> 8, sequence_number & 0xff]) + data
1239        self.assertEqual(expected, actual)
1240
1241    def test_should_create_ICMPv6EchoBody_from_data_when_from_bytes_classmethod_is_called(self):
1242        # GIVEN
1243        identifier = any_identifier()
1244        sequence_number = any_sequence_number()
1245        body_data = any_data()
1246
1247        data = bytearray([(identifier >> 8), (identifier & 0xff), (sequence_number >> 8), (sequence_number & 0xff)])
1248        data += body_data
1249
1250        # WHEN
1251        actual = ICMPv6EchoBody.from_bytes(io.BytesIO(data))
1252
1253        # THEN
1254        self.assertEqual(identifier, actual.identifier)
1255        self.assertEqual(sequence_number, actual.sequence_number)
1256        self.assertEqual(body_data, actual.data)
1257
1258    def test_should_build_ICMPv6EchoBody_from_well_know_values_when_to_bytes_method_is_called(self):
1259        # GIVEN
1260        body = ICMPv6EchoBody(
1261            0, 2,
1262            bytearray([
1263                0x80, 0x00, 0xc7, 0xbf, 0x00, 0x00, 0x00, 0x01, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41,
1264                0x41
1265            ]))
1266
1267        # WHEN
1268        actual = body.to_bytes()
1269
1270        # THEN
1271        expected = bytearray([
1272            0x00, 0x00, 0x00, 0x02, 0x80, 0x00, 0xc7, 0xbf, 0x00, 0x00, 0x00, 0x01, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41,
1273            0x41, 0x41, 0x41, 0x41
1274        ])
1275
1276        self.assertEqual(expected, actual)
1277
1278
1279class TestICMPv6EchoBodyFactory(unittest.TestCase):
1280
1281    def test_should_build_ICMPv6EchoBody_when_to_bytes_method_is_called(self):
1282        # GIVEN
1283        identifier = any_identifier()
1284        sequence_number = any_sequence_number()
1285        body_data = any_data()
1286
1287        data = bytearray([(identifier >> 8) & 0xff, identifier & 0xff,
1288                          (sequence_number >> 8) & 0xff, sequence_number & 0xff]) + body_data
1289
1290        factory = ICMPv6EchoBodyFactory()
1291
1292        # WHEN
1293        actual = factory.parse(io.BytesIO(data), any_message_info())
1294
1295        # THEN
1296        self.assertTrue(isinstance(actual, ICMPv6EchoBody))
1297
1298        self.assertEqual(identifier, actual.identifier)
1299        self.assertEqual(sequence_number, actual.sequence_number)
1300        self.assertEqual(body_data, actual.data)
1301
1302
1303class TestICMPv6DestinationUnreachable(unittest.TestCase):
1304
1305    def test_should_convert_ICMPv6DestinationUnreachable_to_bytearray_when_to_bytes_method_is_called(self):
1306        # GIVEN
1307        data = any_data()
1308
1309        icmpv6_dest_unreachable = ICMPv6DestinationUnreachable(data)
1310
1311        # WHEN
1312        actual_data = icmpv6_dest_unreachable.to_bytes()
1313
1314        # THEN
1315        self.assertEqual(bytearray([0x00, 0x00, 0x00, 0x00]) + data, actual_data)
1316
1317    def test_should_convert_bytearray_to_ICMPv6DestinationUnreachable_when_from_bytes_method_is_called(self):
1318        # GIVEN
1319        data = any_data()
1320
1321        # WHEN
1322        icmpv6_dest_unreachable = ICMPv6DestinationUnreachable.from_bytes(
1323            io.BytesIO(bytearray([0x00, 0x00, 0x00, 0x00]) + data))
1324
1325        # THEN
1326        self.assertEqual(data, icmpv6_dest_unreachable.data)
1327
1328    def test_should_raise_RuntimeError_when_from_bytes_method_is_called(self):
1329        # GIVEN
1330        data = any_data()
1331
1332        unused = random.randint(1, 1 << 32)
1333
1334        # WHEN
1335        self.assertRaises(RuntimeError, ICMPv6DestinationUnreachable.from_bytes,
1336                          io.BytesIO(bytearray(struct.pack(">I", unused)) + data))
1337
1338
1339class TestICMPv6DestinationUnreachableFactory(unittest.TestCase):
1340
1341    def test_should_create_ICMPv6DestinationUnreachable_when_parse_method_is_called(self):
1342        # GIVEN
1343        icmp_data = any_data()
1344
1345        factory = ICMPv6DestinationUnreachableFactory()
1346
1347        data = bytearray([0x00, 0x00, 0x00, 0x00]) + icmp_data
1348
1349        # WHEN
1350        icmpv6_dest_unreachable = factory.parse(io.BytesIO(data), any_message_info())
1351
1352        # THEN
1353        self.assertEqual(icmp_data, icmpv6_dest_unreachable.data)
1354
1355
1356class TestUDPHeaderFactory(unittest.TestCase):
1357
1358    def test_should_create_UDPHeader_when_to_bytes_method_is_called(self):
1359        # GIVEN
1360        factory = UDPHeaderFactory()
1361
1362        src_port = any_port()
1363        dst_port = any_port()
1364        payload_length = any_payload_length()
1365        checksum = any_checksum()
1366
1367        data = struct.pack("!H", src_port) + struct.pack("!H", dst_port) + \
1368            struct.pack("!H", payload_length) + struct.pack("!H", checksum)
1369
1370        # WHEN
1371        udp_header = factory.parse(io.BytesIO(data), any_message_info())
1372
1373        # THEN
1374        self.assertEqual(src_port, udp_header.src_port)
1375        self.assertEqual(dst_port, udp_header.dst_port)
1376        self.assertEqual(payload_length, udp_header.payload_length)
1377        self.assertEqual(checksum, udp_header.checksum)
1378
1379
1380class TestHopByHopOptionsFactory(unittest.TestCase):
1381
1382    def test_should_create_option_from_bytearray_when_to_bytes_method_is_called(self):
1383        # GIVEN
1384
1385        class DummyOptionFactory:
1386
1387            def parse(self, data, message_info):
1388                return data.read()
1389
1390        factory = HopByHopOptionsFactory(options_factories={2: DummyOptionFactory()})
1391
1392        data = bytearray([0x02, 0x03, 0x11, 0x22, 0x33, 0x01, 0x00])
1393
1394        # WHEN
1395        actual_options = factory.parse(io.BytesIO(data), any_message_info())
1396
1397        # THEN
1398        self.assertEqual(1, len(actual_options))
1399        self.assertEqual(2, actual_options[0].header.type)
1400        self.assertEqual(3, actual_options[0].header.length)
1401
1402
1403if __name__ == "__main__":
1404    unittest.main()
1405