1#!/usr/bin/env python3
2#
3#  Copyright (c) 2016, The OpenThread Authors.
4#  All rights reserved.
5#
6#  Redistribution and use in source and binary forms, with or without
7#  modification, are permitted provided that the following conditions are met:
8#  1. Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#  2. Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#  3. Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27#  POSSIBILITY OF SUCH DAMAGE.
28#
29
30import abc
31import io
32import ipaddress
33import struct
34
35from binascii import hexlify
36
37import common
38
39try:
40    from itertools import izip_longest as zip_longest
41except ImportError:
42    from itertools import zip_longest
43
44# Next headers for IPv6 protocols
45IPV6_NEXT_HEADER_HOP_BY_HOP = 0
46IPV6_NEXT_HEADER_TCP = 6
47IPV6_NEXT_HEADER_UDP = 17
48IPV6_NEXT_HEADER_FRAGMENT = 44
49IPV6_NEXT_HEADER_ICMP = 58
50
51UPPER_LAYER_PROTOCOLS = [
52    IPV6_NEXT_HEADER_TCP,
53    IPV6_NEXT_HEADER_UDP,
54    IPV6_NEXT_HEADER_ICMP,
55]
56
57# ICMP Protocol codes
58ICMP_DESTINATION_UNREACHABLE = 1
59ICMP_TIME_EXCEEDED = 3
60ICMP_ECHO_REQUEST = 128
61ICMP_ECHO_RESPONSE = 129
62
63# Default hop limit for IPv6
64HOP_LIMIT_DEFAULT = 64
65
66
67def calculate_checksum(data):
68    """ Calculate checksum from data bytes.
69
70    How to calculate checksum (RFC 2460):
71        https://tools.ietf.org/html/rfc2460#page-27
72
73    Args:
74        data (bytes): input data from which checksum will be calculated
75
76    Returns:
77        int: calculated checksum
78    """
79    # Create halfwords from data bytes. Example: data[0] = 0x01, data[1] =
80    # 0xb2 => 0x01b2
81    halfwords = [((byte0 << 8) | byte1) for byte0, byte1 in zip_longest(data[::2], data[1::2], fillvalue=0x00)]
82
83    checksum = 0
84    for halfword in halfwords:
85        checksum += halfword
86        checksum = (checksum & 0xffff) + (checksum >> 16)
87
88    checksum ^= 0xffff
89
90    if checksum == 0:
91        return 0xffff
92    else:
93        return checksum
94
95
96def synthesize_ip6_address(ip6_network: ipaddress.IPv6Network,
97                           ip4_address: ipaddress.IPv4Address) -> ipaddress.IPv6Address:
98    """ Synthesize an IPv6 address from a prefix for NAT64 and an IPv4 address.
99
100    Only supports /96 network for now.
101
102    Args:
103        ip6_network: The network for NAT64.
104        ip4_address: The IPv4 address.
105
106    Returns:
107        ipaddress.IPv6Address: The synthesized IPv6 address.
108    """
109    if ip6_network.prefixlen != 96:
110        # We are only using /96 networks in openthread
111        raise NotImplementedError("synthesize_ip6_address only supports /96 networks")
112    return ipaddress.IPv6Address(int(ip6_network.network_address) | int(ip4_address))
113
114
115class PacketFactory(object):
116    """ Interface for classes that produce objects from data. """
117
118    def parse(self, data, message_info):
119        """ Convert data to object.
120
121        Args:
122            data (BytesIO)
123            message_info (MessageInfo)
124
125        """
126        raise NotImplementedError
127
128
129class BuildableFromBytes(object):
130    """ Interface for classes which can be built from bytes. """
131
132    @classmethod
133    def from_bytes(cls, data):
134        """ Convert data to object.
135
136        Args:
137            data (bytes)
138
139        """
140        raise NotImplementedError
141
142
143class ConvertibleToBytes(object):
144    """ Interface for classes which can be converted to bytes. """
145
146    def to_bytes(self):
147        """ Convert object to data.
148
149        Returns:
150            bytes
151        """
152        raise NotImplementedError
153
154    def __len__(self):
155        """ Length of data (in bytes).
156
157        Returns:
158            int
159        """
160        raise NotImplementedError
161
162
163class Header(object):
164    """ Interface for header classes. """
165
166    __metaclass__ = abc.ABCMeta
167
168    @abc.abstractproperty
169    def type(self):
170        """ Number which can be used in the next header field in IPv6 header or next headers.
171
172        Returns:
173            int
174        """
175
176
177class ExtensionHeader(object):
178    """ Base for classes representing Extension Headers in IPv6 packets. """
179
180    def __init__(self, next_header, hdr_ext_len=0):
181        self.next_header = next_header
182        self.hdr_ext_len = hdr_ext_len
183
184
185class UpperLayerProtocol(Header, ConvertibleToBytes):
186    """ Base for classes representing upper layer protocol payload in IPv6 packets. """
187
188    def __init__(self, header):
189        self.header = header
190
191    @property
192    def checksum(self):
193        """ Return checksum from upper layer protocol header. """
194        return self.header.checksum
195
196    @checksum.setter
197    def checksum(self, value):
198        """ Set checksum value in upper layer protocol header. """
199        self.header.checksum = value
200
201    def is_valid_checksum(self):
202        """ Return information if set checksum is valid.
203
204        It is not possible to get zero from checksum calculation.
205        Zero indicates invalid checksum value.
206
207        Returns:
208            bool
209        """
210        return self.checksum != 0
211
212
213class IPv6PseudoHeader(ConvertibleToBytes):
214    """ Class representing IPv6 pseudo header which is required to calculate
215    upper layer protocol (like e.g. UDP or ICMPv6) checksum.
216
217    This class is used only during upper layer protocol checksum calculation. Do not use it outside of this module.
218
219    """
220
221    def __init__(self, source_address, destination_address, payload_length, next_header):
222        self._source_address = self._convert_to_ipaddress(source_address)
223        self._destination_address = self._convert_to_ipaddress(destination_address)
224        self.payload_length = payload_length
225        self.next_header = next_header
226
227    def _convert_to_ipaddress(self, value):
228        if isinstance(value, bytearray):
229            value = bytes(value)
230
231        return ipaddress.ip_address(value)
232
233    @property
234    def source_address(self):
235        return self._source_address
236
237    @source_address.setter
238    def source_address(self, value):
239        self._source_address = self._convert_to_ipaddress(value)
240
241    @property
242    def destination_address(self):
243        return self._destination_address
244
245    @destination_address.setter
246    def destination_address(self, value):
247        self._source_address = self._convert_to_ipaddress(value)
248
249    def to_bytes(self):
250        data = bytearray()
251        data += self.source_address.packed
252        data += self.destination_address.packed
253        data += struct.pack(">I", self.payload_length)
254        data += struct.pack(">I", self.next_header)
255
256        return data
257
258
259class IPv6Header(ConvertibleToBytes, BuildableFromBytes):
260    """ Class representing IPv6 packet header. """
261
262    _version = 6
263
264    _header_length = 40
265
266    def __init__(
267        self,
268        source_address,
269        destination_address,
270        traffic_class=0,
271        flow_label=0,
272        hop_limit=64,
273        payload_length=0,
274        next_header=0,
275    ):
276        self.version = self._version
277        self._source_address = self._convert_to_ipaddress(source_address)
278        self._destination_address = self._convert_to_ipaddress(destination_address)
279        self.traffic_class = traffic_class
280        self.flow_label = flow_label
281        self.hop_limit = hop_limit
282        self.payload_length = payload_length
283        self.next_header = next_header
284
285    def _convert_to_ipaddress(self, value):
286        if isinstance(value, bytearray):
287            value = bytes(value)
288
289        return ipaddress.ip_address(value)
290
291    @property
292    def source_address(self):
293        return self._source_address
294
295    @source_address.setter
296    def source_address(self, value):
297        self._source_address = self._convert_to_ipaddress(value)
298
299    @property
300    def destination_address(self):
301        return self._destination_address
302
303    def to_bytes(self):
304        data = bytearray([
305            ((self.version & 0x0F) << 4) | ((self.traffic_class >> 4) & 0x0F),
306            ((self.traffic_class & 0x0F) << 4) | ((self.flow_label >> 16) & 0x0F),
307            ((self.flow_label >> 8) & 0xff),
308            ((self.flow_label & 0xff)),
309        ])
310        data += struct.pack(">H", self.payload_length)
311        data += bytearray([self.next_header, self.hop_limit])
312        data += self.source_address.packed
313        data += self.destination_address.packed
314
315        return data
316
317    @classmethod
318    def from_bytes(cls, data):
319        b = bytearray(data.read(4))
320
321        (b[0] >> 4) & 0x0F
322        traffic_class = ((b[0] & 0x0F) << 4) | ((b[1] >> 4) & 0x0F)
323        flow_label = ((b[1] & 0x0F) << 16) | (b[2] << 8) | b[3]
324
325        payload_length = struct.unpack(">H", data.read(2))[0]
326        next_header = ord(data.read(1))
327        hop_limit = ord(data.read(1))
328        src_addr = bytearray(data.read(16))
329        dst_addr = bytearray(data.read(16))
330
331        return cls(
332            src_addr,
333            dst_addr,
334            traffic_class,
335            flow_label,
336            hop_limit,
337            payload_length,
338            next_header,
339        )
340
341    def __repr__(self):
342        return "IPv6Header(source_address={}, destination_address={}, next_header={}, payload_length={}, \
343            hop_limit={}, traffic_class={}, flow_label={})".format(
344            self.source_address.compressed,
345            self.destination_address.compressed,
346            self.next_header,
347            self.payload_length,
348            self.hop_limit,
349            self.traffic_class,
350            self.flow_label,
351        )
352
353    def __len__(self):
354        return self._header_length
355
356
357class IPv6Packet(ConvertibleToBytes):
358    """ Class representing IPv6 packet.
359
360    IPv6 packet consists of IPv6 header, optional extension header, and upper layer protocol.
361
362                                            IPv6 packet
363
364    +-------------+----------------------------------+----------------------------------------------+
365    |             |                                  |                                              |
366    | IPv6 header | extension headers (zero or more) | upper layer protocol (e.g. UDP, TCP, ICMPv6) |
367    |             |                                  |                                              |
368    +-------------+----------------------------------+----------------------------------------------+
369
370    Extension headers:
371        - HopByHop
372        - Routing header (not implemented in this module)
373        - Fragment Header
374
375    Upper layer protocols:
376        - ICMPv6
377        - UDP
378        - TCP (not implemented in this module)
379
380    Example:
381        IPv6 packet construction without extension headers:
382
383        ipv6_packet = IPv6Packet(IPv6Header("fd00:1234:4555::ff:fe00:1800", "ff03::1"),
384                                 ICMPv6(ICMPv6Header(128, 0),
385                                        ICMPv6EchoBody(0, 2,  bytes([0x80, 0x00, 0xc7, 0xbf, 0x00, 0x00, 0x00, 0x01,
386                                                                     0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41,
387                                                                     0x41, 0x41]))))
388
389        IPv6 packet construction with extension headers:
390
391        ipv6_packet = IPv6Packet(IPv6Header("fd00:1234:4555::ff:fe00:1800", "ff03::1"),
392                                 ICMPv6(ICMPv6Header(128, 0),
393                                        ICMPv6EchoBody(0, 2,  bytes([0x80, 0x00, 0xc7, 0xbf, 0x00, 0x00, 0x00, 0x01,
394                                                                     0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41,
395                                                                     0x41, 0x41])),
396                                 [HopByHop(options=[
397                                     HopByHopOption(HopByHopOptionHeader(_type=0x6d),
398                                                    MPLOption(S=1, M=0, V=0, sequence=2, seed_id=bytes([0x00, 0x18])))
399                                 ])])
400
401    """
402
403    def __init__(self, ipv6_header, upper_layer_protocol, extension_headers=None):
404        self.ipv6_header = ipv6_header
405
406        self.upper_layer_protocol = upper_layer_protocol
407
408        self.extension_headers = (extension_headers if extension_headers is not None else [])
409
410        self._update_next_header_values_in_headers()
411
412        if not upper_layer_protocol.is_valid_checksum():
413            self.upper_layer_protocol.checksum = self.calculate_checksum()
414
415    def _validate_checksum(self):
416        checksum = self.calculate_checksum()
417
418        if self.upper_layer_protocol.checksum != checksum:
419            raise RuntimeError("Could not create IPv6 packet. "
420                               "Invalid checksum: {}!={}".format(self.upper_layer_protocol.checksum, checksum))
421
422        self.upper_layer_protocol.checksum = checksum
423
424    def _update_payload_length_value_in_ipv6_header(self):
425        self.ipv6_header.payload_length = len(self.upper_layer_protocol) + sum(
426            [len(extension_header) for extension_header in self.extension_headers])
427
428    def _update_next_header_values_in_headers(self):
429        last_header = self.ipv6_header
430
431        for extension_header in self.extension_headers:
432            last_header.next_header = extension_header.type
433            last_header = extension_header
434
435        last_header.next_header = self.upper_layer_protocol.type
436
437    def calculate_checksum(self):
438        saved_checksum = self.upper_layer_protocol.checksum
439
440        self.upper_layer_protocol.checksum = 0
441
442        upper_layer_protocol_bytes = self.upper_layer_protocol.to_bytes()
443
444        self.upper_layer_protocol.checksum = saved_checksum
445
446        pseudo_header = IPv6PseudoHeader(
447            self.ipv6_header.source_address,
448            self.ipv6_header.destination_address,
449            len(upper_layer_protocol_bytes),
450            self.upper_layer_protocol.type,
451        )
452
453        return calculate_checksum(pseudo_header.to_bytes() + upper_layer_protocol_bytes)
454
455    def to_bytes(self):
456        self._update_payload_length_value_in_ipv6_header()
457        self._update_next_header_values_in_headers()
458        self.upper_layer_protocol.checksum = self.calculate_checksum()
459
460        ipv6_packet = self.ipv6_header.to_bytes()
461
462        for extension_header in self.extension_headers:
463            ipv6_packet += extension_header.to_bytes()
464
465        ipv6_packet += self.upper_layer_protocol.to_bytes()
466
467        return ipv6_packet
468
469    def __repr__(self):
470        return "IPv6Packet(header={}, upper_layer_protocol={})".format(self.ipv6_header, self.upper_layer_protocol)
471
472
473class UDPHeader(ConvertibleToBytes, BuildableFromBytes):
474    """ Class representing UDP datagram header.
475
476    This header is required to construct UDP datagram.
477
478    """
479
480    _header_length = 8
481
482    def __init__(self, src_port, dst_port, payload_length=0, checksum=0):
483        self.src_port = src_port
484        self.dst_port = dst_port
485
486        self._payload_length = payload_length
487        self.checksum = checksum
488
489    @property
490    def type(self):
491        return 17
492
493    @property
494    def payload_length(self):
495        return self._payload_length
496
497    @payload_length.setter
498    def payload_length(self, value):
499        self._payload_length = self._header_length + value
500
501    def to_bytes(self):
502        data = struct.pack(">H", self.src_port)
503        data += struct.pack(">H", self.dst_port)
504        data += struct.pack(">H", self.payload_length)
505        data += struct.pack(">H", self.checksum)
506
507        return data
508
509    @classmethod
510    def from_bytes(cls, data):
511        src_port = struct.unpack(">H", data.read(2))[0]
512        dst_port = struct.unpack(">H", data.read(2))[0]
513        payload_length = struct.unpack(">H", data.read(2))[0]
514        checksum = struct.unpack(">H", data.read(2))[0]
515
516        return cls(src_port, dst_port, payload_length, checksum)
517
518    def __len__(self):
519        return self._header_length
520
521
522class UDPDatagram(UpperLayerProtocol):
523    """ Class representing UDP datagram.
524
525    UDP is an upper layer protocol for IPv6 so it can be passed to IPv6 packet as upper_layer_protocol.
526
527    This class consists of a UDP header and payload. The example below shows how a UDP datagram can be constructed.
528
529    Example:
530        udp_dgram = UDPDatagram(UDPHeader(src_port=19788, dst_port=19788),
531                                bytes([0x00, 0x15, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
532                                       0x00, 0x00, 0x01, 0x09, 0x01, 0x01, 0x0b, 0x03,
533                                       0x04, 0xc6, 0x69, 0x73, 0x51, 0x0e, 0x01, 0x80,
534                                       0x12, 0x02, 0x00, 0x01, 0xde, 0xad, 0xbe, 0xef]))
535
536    """
537
538    @property
539    def type(self):
540        return 17
541
542    def __init__(self, header, payload):
543        super(UDPDatagram, self).__init__(header)
544        self.payload = payload
545
546    def to_bytes(self):
547        self.header.payload_length = len(self.payload)
548
549        data = bytearray()
550        data += self.header.to_bytes()
551        data += self.payload.to_bytes()
552        return data
553
554    def __len__(self):
555        return len(self.header) + len(self.payload)
556
557
558class ICMPv6Header(ConvertibleToBytes, BuildableFromBytes):
559    """ Class representing ICMPv6 message header.
560
561    This header is required to construct ICMPv6 message.
562
563    """
564
565    _header_length = 4
566
567    def __init__(self, _type, code, checksum=0):
568        self.type = _type
569        self.code = code
570
571        self.checksum = checksum
572
573    def to_bytes(self):
574        return bytearray([self.type, self.code]) + struct.pack(">H", self.checksum)
575
576    @classmethod
577    def from_bytes(cls, data):
578        _type = ord(data.read(1))
579        code = ord(data.read(1))
580        checksum = struct.unpack(">H", data.read(2))[0]
581
582        return cls(_type, code, checksum)
583
584    def __len__(self):
585        return self._header_length
586
587
588class ICMPv6(UpperLayerProtocol):
589    """ Class representing ICMPv6 message.
590
591    ICMPv6 is an upper layer protocol for IPv6 so it can be passed to IPv6 packet as upper_layer_protocol.
592
593    This class consists of an ICMPv6 header and body. The example below shows how an ICMPv6 message can be constructed.
594
595    Example:
596        icmpv6_msg = ICMPv6(ICMPv6Header(128, 0),
597                            ICMPv6EchoBody(0, 2, bytes([0x80, 0x00, 0xc7, 0xbf, 0x00, 0x00, 0x00, 0x01,
598                                                        0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41,
599                                                        0x41, 0x41])))
600
601    """
602
603    @property
604    def type(self):
605        return 58
606
607    def __init__(self, header, body):
608        super(ICMPv6, self).__init__(header)
609        self.body = body
610
611    def to_bytes(self):
612        return bytearray(self.header.to_bytes() + self.body.to_bytes())
613
614    def __len__(self):
615        return len(self.header) + len(self.body)
616
617
618class FragmentHeader(ExtensionHeader):
619    """ Class representing Fragment extension header.
620
621    +-------------+----------+-----------------+-----+---+----------------+
622    | Next Header | Reserved | Fragment Offset | Res | M | Identification |
623    +-------------+----------+-----------------+-----+---+----------------+
624
625    Fragment extension header consists of:
626        - next_header type (8 bit)
627        - fragment offset which is multiple of 8 (13 bit)
628        - more_flag to indicate further data (1 bit)
629        - identification for all associated fragments (32 bit)
630    """
631
632    @property
633    def type(self):
634        return 44
635
636    @property
637    def identification(self):
638        return self._identification
639
640    @property
641    def more_flag(self):
642        return self._more_flag
643
644    @property
645    def offset(self):
646        return self._fragm_offset
647
648    def __init__(self, next_header=None, fragm_offset=0, more_flag=False, identification=0):
649        super(FragmentHeader, self).__init__(next_header, 0)
650        self._fragm_offset = fragm_offset
651        self._more_flag = more_flag
652        self._identification = identification
653
654    def callculate_offset(self, position):
655        return position >> 3
656
657    def to_bytes(self):
658        data = bytearray([self.next_header, 0x00])
659        data += bytearray([self._fragm_offset >> 5, ((self._fragm_offset << 3) | self._more_flag) & 0xff])
660        data += struct.pack(">I", self._identification)
661
662        return data
663
664    @classmethod
665    def from_bytes(cls, data):
666        next_header = struct.unpack(">B", data.read(1))[0]
667        struct.unpack(">B", data.read(1))[0]  # reserved
668        fragment_offset = struct.unpack(">H", data.read(2))[0]
669        more_flag = fragment_offset & 0x1
670        identificaton = struct.unpack(">I", data.read(4))[0]
671
672        fragment_offset = fragment_offset >> 3
673
674        return cls(next_header, fragment_offset, more_flag, identificaton)
675
676    def __len__(self):
677        return 64
678
679
680class HopByHop(ExtensionHeader):
681    """ Class representing HopByHop extension header.
682
683    HopByHop extension header consists of:
684        - next_header type
685        - extension header length which is multiple of 8
686        - options
687
688    """
689
690    _one_byte_padding = 0x00
691    _many_bytes_padding = 0x01
692
693    @property
694    def type(self):
695        return 0
696
697    def __init__(self, next_header=None, options=None, hdr_ext_len=None):
698        super(HopByHop, self).__init__(next_header, hdr_ext_len)
699        self.options = options if options is not None else []
700
701        if hdr_ext_len is not None:
702            self.hdr_ext_len = hdr_ext_len
703        else:
704            payload_length = self._calculate_payload_length()
705            self.hdr_ext_len = self._calculate_hdr_ext_len(payload_length)
706
707    def _calculate_payload_length(self):
708        payload_length = 2
709
710        for option in self.options:
711            payload_length += len(option)
712
713        return payload_length
714
715    def _calculate_hdr_ext_len(self, payload_length):
716        count = payload_length >> 3
717
718        if (payload_length & 0x7) == 0 and count > 0:
719            return count - 1
720
721        return count
722
723    def to_bytes(self):
724        data = bytearray([self.next_header, self.hdr_ext_len])
725
726        for option in self.options:
727            data += option.to_bytes()
728
729        # Padding
730        #
731        # More details:
732        #   https://tools.ietf.org/html/rfc2460#section-4.2
733        #
734        excess_bytes = len(data) & 0x7
735
736        if excess_bytes > 0:
737            padding_length = 8 - excess_bytes
738
739            if padding_length == 1:
740                data += bytearray([self._one_byte_padding])
741
742            else:
743                padding_length -= 2
744                data += bytearray([self._many_bytes_padding, padding_length])
745                data += bytearray([0x00 for _ in range(padding_length)])
746
747        return data
748
749    def __len__(self):
750        """ HopByHop extension header length
751
752        More details:
753            https://tools.ietf.org/html/rfc2460#section-4.3
754
755        """
756        return (self.hdr_ext_len + 1) * 8
757
758
759class HopByHopOptionHeader(ConvertibleToBytes, BuildableFromBytes):
760    """ Class representing HopByHop option header. """
761
762    _header_length = 2
763
764    def __init__(self, _type, length=None):
765        self.type = _type
766        self.length = length if length is not None else 0
767
768    def to_bytes(self):
769        return bytearray([self.type, self.length])
770
771    @classmethod
772    def from_bytes(cls, data):
773        _type = ord(data.read(1))
774        length = ord(data.read(1))
775        return cls(_type, length)
776
777    def __len__(self):
778        return self._header_length
779
780    def __repr__(self):
781        return "HopByHopOptionHeader(type={}, length={})".format(self.type, self.length)
782
783
784class HopByHopOption(ConvertibleToBytes):
785    """ Class representing HopByHop option.
786
787    Class consists of two elements: HopByHopOptionHeader and value (e.g. for MPLOption).
788
789    The following example shows how any HopByHop option can be constructed.
790
791    Example:
792        HopByHop(next_header=0x3a,
793                 options=[HopByHopOption(HopByHopOptionHeader(_type=0x6d),
794                                         MPLOption(S=1, M=0, V=0, sequence=2, seed_id=bytes([0x00, 0x18])))
795
796    """
797
798    def __init__(self, header, value):
799        self.value = value
800
801        self.header = header
802        self.header.length = len(self.value)
803
804    def to_bytes(self):
805        return self.header.to_bytes() + self.value.to_bytes()
806
807    def __len__(self):
808        return len(self.header) + len(self.value)
809
810    def __repr__(self):
811        return "HopByHopOption(header={}, value={})".format(self.header, self.value)
812
813
814class MPLOption(ConvertibleToBytes):
815    """ Class representing MPL option. """
816
817    _header_length = 2
818
819    _seed_id_length = {0: 0, 1: 2, 2: 8, 3: 16}
820
821    def __init__(self, S, M, V, sequence, seed_id):
822        self.S = S
823        self.M = M
824        self.V = V
825        self.sequence = sequence
826        self.seed_id = seed_id
827
828    def to_bytes(self):
829        smv = (((self.S & 0x03) << 6) | ((self.M & 0x01) << 5) | ((self.V & 0x01) << 4))
830
831        return bytearray([smv, self.sequence]) + self.seed_id
832
833    @classmethod
834    def from_bytes(cls, data):
835        b = ord(data.read(1))
836
837        s = (b >> 6) & 0x03
838        m = (b >> 5) & 0x01
839        v = (b >> 4) & 0x01
840
841        sequence = ord(data.read(1))
842        seed_id = data.read(cls._seed_id_length[s])
843
844        return cls(s, m, v, sequence, seed_id)
845
846    def __len__(self):
847        return self._header_length + self._seed_id_length[self.S]
848
849    def __repr__(self):
850        return "MPLOption(S={}, M={}, V={}, sequence={}, seed_id={})".format(self.S, self.M, self.V, self.sequence,
851                                                                             hexlify(self.seed_id))
852
853
854class IPv6PacketFactory(PacketFactory):
855    """ Factory that produces IPv6 packets from data.
856
857    This factory must be initialized with factories which allow to parse extension headers and upper layer protocols.
858
859    The following example shows preferable setup of IPv6PacketFactory.
860
861    Header types:
862        0: HopByHop
863        17: UDP
864        58: ICMPv6
865
866    Option types:
867        109: MPL
868
869    ICMPv6 body types:
870        128: Echo request
871        129: Echo response
872
873    Example usage:
874
875        ipv6_factory = IPv6PacketFactory(
876            ehf={
877                0: HopByHopFactory(options_factories={
878                    109: MPLOptionFactory()
879                })
880            },
881            ulpf={
882                17: UDPDatagramFactory(dst_port_factories={
883                    19788: MLEMessageFactory(),
884                    19789: CoAPMessageFactory()
885                }),
886                58: ICMPv6Factory(body_factories={
887                    128: ICMPv6EchoBodyFactory(),
888                    129: ICMPv6EchoBodyFactory()
889                })
890            }
891        )
892
893    """
894
895    def __init__(self, ehf=None, ulpf=None):
896        """
897        ehf - Extension Header Factory
898        ulpf - Upper Layer Protocol Factory
899
900        Args:
901            ehf(dict[int: PacketFactory]): Dictionary mapping extension header types on specialized factories.
902            ulpf(dict[int: PacketFactory]): Dictionary mapping upper layer protocol types on specialized factories.
903        """
904        self._ehf = ehf if ehf is not None else {}
905        self._ulpf = ulpf if ulpf is not None else {}
906
907    def _is_extension_header(self, header_type):
908        return header_type not in UPPER_LAYER_PROTOCOLS
909
910    def _get_extension_header_factory_for(self, next_header):
911        try:
912            return self._ehf[next_header]
913        except KeyError:
914            raise RuntimeError("Could not get Extension Header factory for next_header={}.".format(next_header))
915
916    def _get_upper_layer_protocol_factory_for(self, next_header):
917        try:
918            return self._ulpf[next_header]
919        except KeyError:
920            raise RuntimeError("Could not get Upper Layer Protocol factory for next_header={}.".format(next_header))
921
922    def _parse_extension_headers(self, data, next_header, message_info):
923        extension_headers = []
924
925        while self._is_extension_header(next_header):
926            factory = self._get_extension_header_factory_for(next_header)
927
928            extension_header = factory.parse(data, message_info)
929
930            next_header = extension_header.next_header
931
932            extension_headers.append(extension_header)
933
934        return next_header, extension_headers
935
936    def _parse_upper_layer_protocol(self, data, next_header, message_info):
937        factory = self._get_upper_layer_protocol_factory_for(next_header)
938
939        return factory.parse(data, message_info)
940
941    def parse(self, data, message_info):
942        ipv6_header = IPv6Header.from_bytes(data)
943
944        message_info.source_ipv6 = ipv6_header.source_address
945        message_info.destination_ipv6 = ipv6_header.destination_address
946
947        next_header, extension_headers = self._parse_extension_headers(data, ipv6_header.next_header, message_info)
948
949        upper_layer_protocol = self._parse_upper_layer_protocol(data, next_header, message_info)
950
951        return IPv6Packet(ipv6_header, upper_layer_protocol, extension_headers)
952
953
954class HopByHopOptionsFactory(object):
955    """ Factory that produces HopByHop options. """
956
957    _one_byte_padding = 0x00
958    _many_bytes_padding = 0x01
959
960    def __init__(self, options_factories=None):
961        self._options_factories = (options_factories if options_factories is not None else {})
962
963    def _get_HopByHopOption_value_factory(self, _type):
964        try:
965            return self._options_factories[_type]
966        except KeyError:
967            raise RuntimeError("Could not find HopByHopOption value factory for type={}.".format(_type))
968
969    def parse(self, data, message_info):
970        options = []
971
972        while data.tell() < len(data.getvalue()):
973            option_header = HopByHopOptionHeader.from_bytes(data)
974
975            if option_header.type == self._one_byte_padding:
976                # skip one byte padding
977                data.read(1)
978
979            elif option_header.type == self._many_bytes_padding:
980                # skip n bytes padding
981                data.read(option_header.length)
982
983            else:
984                factory = self._get_HopByHopOption_value_factory(option_header.type)
985
986                option_data = data.read(option_header.length)
987
988                option = HopByHopOption(
989                    option_header,
990                    factory.parse(io.BytesIO(option_data), message_info),
991                )
992
993                options.append(option)
994
995        return options
996
997
998class HopByHopFactory(PacketFactory):
999    """ Factory that produces HopByHop extension headers from data. """
1000
1001    def __init__(self, hop_by_hop_options_factory):
1002        self._hop_by_hop_options_factory = hop_by_hop_options_factory
1003
1004    def _calculate_extension_header_length(self, hdr_ext_len):
1005        return (hdr_ext_len + 1) * 8
1006
1007    def parse(self, data, message_info):
1008        next_header = ord(data.read(1))
1009
1010        hdr_ext_len = ord(data.read(1))
1011
1012        # Note! Two bytes were read (next_header and hdr_ext_len) so they must
1013        # be subtracted from header length
1014        hop_by_hop_length = (self._calculate_extension_header_length(hdr_ext_len) - 2)
1015
1016        hop_by_hop_data = data.read(hop_by_hop_length)
1017
1018        options = self._hop_by_hop_options_factory.parse(io.BytesIO(hop_by_hop_data), message_info)
1019
1020        hop_by_hop = HopByHop(next_header, options, hdr_ext_len)
1021
1022        message_info.payload_length += len(hop_by_hop)
1023
1024        return hop_by_hop
1025
1026
1027class MPLOptionFactory(PacketFactory):
1028    """ Factory that produces MPL options for HopByHop extension header. """
1029
1030    def parse(self, data, message_info):
1031        return MPLOption.from_bytes(data)
1032
1033
1034class UDPHeaderFactory:
1035    """ Factory that produces UDP header. """
1036
1037    def parse(self, data, message_info):
1038        return UDPHeader.from_bytes(data)
1039
1040
1041class UdpBasedOnSrcDstPortsPayloadFactory:
1042
1043    # TODO: Unittests
1044    """ Factory that produces UDP payload. """
1045
1046    def __init__(self, src_dst_port_based_payload_factories):
1047        """
1048        Args:
1049            src_dst_port_based_payload_factories (PacketFactory):
1050                Factories parse UDP payload based on source or
1051                destination port.
1052        """
1053        self._factories = src_dst_port_based_payload_factories
1054
1055    def parse(self, data, message_info):
1056        factory = None
1057
1058        if message_info.dst_port in self._factories:
1059            factory = self._factories[message_info.dst_port]
1060
1061        if message_info.src_port in self._factories:
1062            factory = self._factories[message_info.src_port]
1063
1064        if message_info.dst_port == common.UDP_TEST_PORT:
1065            # Ignore traffic targeted to test port
1066            return None
1067        elif factory is None:
1068            raise RuntimeError("Could not find factory to build UDP payload.")
1069
1070        return factory.parse(data, message_info)
1071
1072
1073class UDPDatagramFactory(PacketFactory):
1074
1075    # TODO: Unittests
1076    """ Factory that produces UDP datagrams. """
1077
1078    def __init__(self, udp_header_factory, udp_payload_factory):
1079        self._udp_header_factory = udp_header_factory
1080        self._udp_payload_factory = udp_payload_factory
1081
1082    def parse(self, data, message_info):
1083        header = self._udp_header_factory.parse(data, message_info)
1084
1085        # Update message payload length: UDP header (8B) + payload length
1086        message_info.payload_length += len(header) + (len(data.getvalue()) - data.tell())
1087
1088        message_info.src_port = header.src_port
1089        message_info.dst_port = header.dst_port
1090
1091        payload = self._udp_payload_factory.parse(data, message_info)
1092
1093        return UDPDatagram(header, payload)
1094
1095
1096class ICMPv6Factory(PacketFactory):
1097    """ Factory that produces ICMPv6 messages from data. """
1098
1099    def __init__(self, body_factories=None):
1100        self._body_factories = (body_factories if body_factories is not None else {})
1101
1102    def _get_icmpv6_body_factory(self, _type):
1103        try:
1104            return self._body_factories[_type]
1105
1106        except KeyError:
1107            if "default" not in self._body_factories:
1108                raise RuntimeError("Could not find specialized factory to parse ICMP body. "
1109                                   "Unsupported ICMP type: {}".format(_type))
1110
1111            default_factory = self._body_factories["default"]
1112
1113            print("Could not find specialized factory to parse ICMP body. "
1114                  "Take the default one: {}".format(type(default_factory)))
1115
1116            return default_factory
1117
1118    def parse(self, data, message_info):
1119        header = ICMPv6Header.from_bytes(data)
1120
1121        factory = self._get_icmpv6_body_factory(header.type)
1122
1123        message_info.payload_length += len(header) + (len(data.getvalue()) - data.tell())
1124
1125        return ICMPv6(header, factory.parse(data, message_info))
1126
1127
1128class ICMPv6EchoBodyFactory(PacketFactory):
1129    """ Factory that produces ICMPv6 echo message body. """
1130
1131    def parse(self, data, message_info):
1132        return ICMPv6EchoBody.from_bytes(data)
1133
1134
1135class BytesPayload(ConvertibleToBytes, BuildableFromBytes):
1136    """ Class representing bytes payload. """
1137
1138    def __init__(self, data):
1139        self.data = data
1140
1141    def to_bytes(self):
1142        return bytearray(self.data)
1143
1144    @classmethod
1145    def from_bytes(cls, data):
1146        return cls(data)
1147
1148    def __len__(self):
1149        return len(self.data)
1150
1151
1152class BytesPayloadFactory(PacketFactory):
1153    """ Factory that produces bytes payload. """
1154
1155    def parse(self, data, message_info):
1156        return BytesPayload(data.read())
1157
1158
1159class ICMPv6EchoBody(ConvertibleToBytes, BuildableFromBytes):
1160    """ Class representing body of ICMPv6 echo messages. """
1161
1162    _header_length = 4
1163
1164    def __init__(self, identifier, sequence_number, data):
1165        self.identifier = identifier
1166        self.sequence_number = sequence_number
1167        self.data = data
1168
1169    def to_bytes(self):
1170        data = struct.pack(">H", self.identifier)
1171        data += struct.pack(">H", self.sequence_number)
1172        data += self.data
1173        return data
1174
1175    @classmethod
1176    def from_bytes(cls, data):
1177        identifier = struct.unpack(">H", data.read(2))[0]
1178        sequence_number = struct.unpack(">H", data.read(2))[0]
1179
1180        return cls(identifier, sequence_number, data.read())
1181
1182    def __len__(self):
1183        return self._header_length + len(self.data)
1184
1185
1186class ICMPv6DestinationUnreachableFactory(PacketFactory):
1187    """ Factory that produces ICMPv6 echo message body. """
1188
1189    def parse(self, data, message_info):
1190        return ICMPv6DestinationUnreachable.from_bytes(data)
1191
1192
1193class ICMPv6DestinationUnreachable(ConvertibleToBytes, BuildableFromBytes):
1194    """ Class representing body of ICMPv6 Destination Unreachable messages. """
1195
1196    _header_length = 4
1197    _unused = 0
1198
1199    def __init__(self, data):
1200        self.data = data
1201
1202    def to_bytes(self):
1203        data = bytearray(struct.pack(">I", self._unused))
1204        data += self.data
1205        return data
1206
1207    @classmethod
1208    def from_bytes(cls, data):
1209        unused = struct.unpack(">I", data.read(4))[0]
1210        if unused != 0:
1211            raise RuntimeError(
1212                "Invalid value of unused field in the ICMPv6 Destination Unreachable data. Expected value: 0.")
1213
1214        return cls(bytearray(data.read()))
1215
1216    def __len__(self):
1217        return self._header_length + len(self.data)
1218