1#!/usr/bin/env python3
2#
3#  Copyright (c) 2016, The OpenThread Authors.
4#  All rights reserved.
5#
6#  Redistribution and use in source and binary forms, with or without
7#  modification, are permitted provided that the following conditions are met:
8#  1. Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#  2. Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#  3. Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27#  POSSIBILITY OF SUCH DAMAGE.
28#
29
30import io
31import random
32import struct
33import unittest
34
35import common
36import config
37import mle
38import network_data
39
40
41def any_address():
42    return random.getrandbits(16)
43
44
45def any_receiver():
46    return random.getrandbits(1)
47
48
49def any_secure():
50    return random.getrandbits(1)
51
52
53def any_device_type():
54    return random.getrandbits(1)
55
56
57def any_network_data():
58    return random.getrandbits(1)
59
60
61mode_map = {
62    0x00: {
63        "receiver": 0,
64        "secure": 0,
65        "device_type": 0,
66        "network_data": 0
67    },
68    0x08: {
69        "receiver": 1,
70        "secure": 0,
71        "device_type": 0,
72        "network_data": 0
73    },
74    0x04: {
75        "receiver": 0,
76        "secure": 1,
77        "device_type": 0,
78        "network_data": 0
79    },
80    0x0C: {
81        "receiver": 1,
82        "secure": 1,
83        "device_type": 0,
84        "network_data": 0
85    },
86    0x02: {
87        "receiver": 0,
88        "secure": 0,
89        "device_type": 1,
90        "network_data": 0
91    },
92    0x0A: {
93        "receiver": 1,
94        "secure": 0,
95        "device_type": 1,
96        "network_data": 0
97    },
98    0x06: {
99        "receiver": 0,
100        "secure": 1,
101        "device_type": 1,
102        "network_data": 0
103    },
104    0x0E: {
105        "receiver": 1,
106        "secure": 1,
107        "device_type": 1,
108        "network_data": 0
109    },
110    0x01: {
111        "receiver": 0,
112        "secure": 0,
113        "device_type": 0,
114        "network_data": 1
115    },
116    0x09: {
117        "receiver": 1,
118        "secure": 0,
119        "device_type": 0,
120        "network_data": 1
121    },
122    0x05: {
123        "receiver": 0,
124        "secure": 1,
125        "device_type": 0,
126        "network_data": 1
127    },
128    0x0D: {
129        "receiver": 1,
130        "secure": 1,
131        "device_type": 0,
132        "network_data": 1
133    },
134    0x03: {
135        "receiver": 0,
136        "secure": 0,
137        "device_type": 1,
138        "network_data": 1
139    },
140    0x0B: {
141        "receiver": 1,
142        "secure": 0,
143        "device_type": 1,
144        "network_data": 1
145    },
146    0x07: {
147        "receiver": 0,
148        "secure": 1,
149        "device_type": 1,
150        "network_data": 1
151    },
152    0x0F: {
153        "receiver": 1,
154        "secure": 1,
155        "device_type": 1,
156        "network_data": 1
157    }
158}
159
160
161def any_mode():
162    return random.getrandbits(4)
163
164
165def any_timeout():
166    return random.getrandbits(32)
167
168
169def any_challenge():
170    length = random.randint(4, 8)
171    return bytearray(random.getrandbits(8) for _ in range(length))
172
173
174def any_response():
175    length = random.randint(4, 8)
176    return bytearray(random.getrandbits(8) for _ in range(length))
177
178
179def any_link_layer_frame_counter():
180    return random.getrandbits(32)
181
182
183def any_mle_frame_counter():
184    return random.getrandbits(32)
185
186
187def any_output():
188    return random.getrandbits(2)
189
190
191def any_input():
192    return random.getrandbits(2)
193
194
195def any_route():
196    return random.getrandbits(4)
197
198
199def any_router_id_mask():
200    return random.getrandbits(64)
201
202
203def any_link_quality_and_route_data(length=None):
204    length = length if length is not None else random.randint(0, 63)
205    return [random.getrandbits(8) for _ in range(length)]
206
207
208def any_partition_id():
209    return random.getrandbits(32)
210
211
212def any_weighting():
213    return random.getrandbits(8)
214
215
216def any_data_version():
217    return random.getrandbits(8)
218
219
220def any_stable_data_version():
221    return random.getrandbits(8)
222
223
224def any_leader_router_id():
225    return random.getrandbits(8)
226
227
228scan_mask_map = {
229    0x00: {
230        "router": 0,
231        "end_device": 0
232    },
233    0x40: {
234        "router": 0,
235        "end_device": 1
236    },
237    0x80: {
238        "router": 1,
239        "end_device": 0
240    },
241    0xC0: {
242        "router": 1,
243        "end_device": 1
244    },
245}
246
247
248def any_scan_mask_router():
249    return random.getrandbits(1)
250
251
252def any_scan_mask_end_device():
253    return random.getrandbits(1)
254
255
256def any_scan_mask():
257    return (random.getrandbits(2) << 6)
258
259
260def any_link_margin():
261    return random.getrandbits(8)
262
263
264def any_status():
265    return random.getrandbits(8)
266
267
268def any_version():
269    return random.getrandbits(16)
270
271
272def any_channel_page():
273    return random.getrandbits(8)
274
275
276def any_channel():
277    return random.getrandbits(16)
278
279
280def any_pan_id():
281    return random.getrandbits(16)
282
283
284def any_timestamp_seconds():
285    return random.getrandbits(48)
286
287
288def any_timestamp_ticks():
289    return random.getrandbits(15)
290
291
292def any_u():
293    return random.getrandbits(1)
294
295
296def any_pp():
297    return (random.getrandbits(2) << 6)
298
299
300def any_link_quality_3():
301    return random.getrandbits(8)
302
303
304def any_link_quality_2():
305    return random.getrandbits(8)
306
307
308def any_link_quality_1():
309    return random.getrandbits(8)
310
311
312def any_leader_cost():
313    return random.getrandbits(8)
314
315
316def any_id_sequence():
317    return random.getrandbits(8)
318
319
320def any_active_routers():
321    return random.getrandbits(8)
322
323
324def any_sed_buffer_size():
325    return random.getrandbits(16)
326
327
328def any_sed_datagram_count():
329    return random.getrandbits(8)
330
331
332def any_tlvs(length=None):
333    if length is None:
334        length = random.randint(0, 16)
335
336    return [random.getrandbits(8) for _ in range(length)]
337
338
339def any_cid():
340    return random.getrandbits(4)
341
342
343def any_iid():
344    return bytearray([random.getrandbits(8) for _ in range(8)])
345
346
347def any_ipv6_address():
348    return bytearray([random.getrandbits(8) for _ in range(16)])
349
350
351def any_addresses():
352    addresses = [mle.AddressCompressed(any_cid(), any_iid()), mle.AddressFull(any_ipv6_address())]
353
354    return addresses
355
356
357def any_key_id_mode():
358    return random.getrandbits(2)
359
360
361def any_security_level():
362    return random.getrandbits(3)
363
364
365def any_frame_counter():
366    return random.getrandbits(32)
367
368
369def any_key_id(key_id_mode):
370    if key_id_mode == 0:
371        length = 0
372    elif key_id_mode == 1:
373        length = 1
374    elif key_id_mode == 2:
375        length = 5
376    elif key_id_mode == 3:
377        length = 9
378
379    return bytearray([random.getrandbits(8) for _ in range(length)])
380
381
382def any_eui64():
383    return bytearray([random.getrandbits(8) for _ in range(8)])
384
385
386class TestSourceAddress(unittest.TestCase):
387
388    def test_should_return_address_value_when_address_property_is_called(self):
389        # GIVEN
390        address = any_address()
391
392        source_address = mle.SourceAddress(address)
393
394        # WHEN
395        actual_address = source_address.address
396
397        # THEN
398        self.assertEqual(address, actual_address)
399
400
401class TestSourceAddressFactory(unittest.TestCase):
402
403    def test_should_create_SourceAddress_from_bytearray_when_parse_method_is_called(self):
404        # GIVEN
405        address = any_address()
406
407        factory = mle.SourceAddressFactory()
408
409        data = struct.pack(">H", address)
410
411        # WHEN
412        actual_source_address = factory.parse(io.BytesIO(data), dict())
413
414        # THEN
415        self.assertTrue(isinstance(actual_source_address, mle.SourceAddress))
416        self.assertEqual(address, actual_source_address.address)
417
418
419class TestMode(unittest.TestCase):
420
421    def test_should_return_receiver_value_when_receiver_property_is_called(self):
422        # GIVEN
423        receiver = any_receiver()
424
425        mode = mle.Mode(receiver, any_secure(), any_device_type(), any_network_data())
426
427        # WHEN
428        actual_receiver = mode.receiver
429
430        # THEN
431        self.assertEqual(receiver, actual_receiver)
432
433    def test_should_return_secure_value_when_secure_property_is_called(self):
434        # GIVEN
435        secure = any_secure()
436
437        mode = mle.Mode(any_receiver(), secure, any_device_type(), any_network_data())
438
439        # WHEN
440        actual_secure = mode.secure
441
442        # THEN
443        self.assertEqual(secure, actual_secure)
444
445    def test_should_return_device_type_value_when_device_type_property_is_called(self):
446        # GIVEN
447        device_type = any_device_type()
448
449        mode = mle.Mode(any_receiver(), any_secure(), device_type, any_network_data())
450
451        # WHEN
452        actual_device_type = mode.device_type
453
454        # THEN
455        self.assertEqual(device_type, actual_device_type)
456
457    def test_should_return_network_data_value_when_network_data_property_is_called(self):
458        # GIVEN
459        network_data = any_network_data()
460
461        mode = mle.Mode(any_receiver(), any_secure(), any_device_type(), network_data)
462
463        # WHEN
464        actual_network_data = mode.network_data
465
466        # THEN
467        self.assertEqual(network_data, actual_network_data)
468
469
470class TestModeFactory(unittest.TestCase):
471
472    def test_should_create_Mode_from_bytearray_when_parse_method_is_called(self):
473        # GIVEN
474        mode = any_mode()
475
476        factory = mle.ModeFactory()
477
478        data = bytearray([mode])
479
480        # WHEN
481        actual_mode = factory.parse(io.BytesIO(data), dict())
482
483        # THEN
484        self.assertTrue(isinstance(actual_mode, mle.Mode))
485        self.assertEqual(mode_map[mode]["receiver"], actual_mode.receiver)
486        self.assertEqual(mode_map[mode]["secure"], actual_mode.secure)
487        self.assertEqual(mode_map[mode]["device_type"], actual_mode.device_type)
488        self.assertEqual(mode_map[mode]["network_data"], actual_mode.network_data)
489
490
491class TestTimeout(unittest.TestCase):
492
493    def test_should_return_timeout_value_when_timeout_property_is_called(self):
494        # GIVEN
495        timeout = any_timeout()
496
497        timeout_obj = mle.Timeout(timeout)
498
499        # WHEN
500        actual_timeout = timeout_obj.timeout
501
502        # THEN
503        self.assertEqual(timeout, actual_timeout)
504
505
506class TestTimeoutFactory(unittest.TestCase):
507
508    def test_should_create_Timeout_from_bytearray_when_parse_method_is_called(self):
509        # GIVEN
510        timeout = any_timeout()
511
512        factory = mle.TimeoutFactory()
513
514        data = struct.pack(">I", timeout)
515
516        # WHEN
517        actual_timeout = factory.parse(io.BytesIO(data), dict())
518
519        # THEN
520        self.assertTrue(isinstance(actual_timeout, mle.Timeout))
521        self.assertEqual(timeout, actual_timeout.timeout)
522
523
524class TestChallenge(unittest.TestCase):
525
526    def test_should_return_challenge_value_when_challenge_property_is_called(self):
527        # GIVEN
528        challenge = any_challenge()
529
530        challenge_obj = mle.Challenge(challenge)
531
532        # WHEN
533        actual_challenge = challenge_obj.challenge
534
535        # THEN
536        self.assertEqual(challenge, actual_challenge)
537
538
539class TestChallengeFactory(unittest.TestCase):
540
541    def test_should_create_Challenge_from_bytearray_when_parse_method_is_called(self):
542        # GIVEN
543        challenge = any_challenge()
544
545        factory = mle.ChallengeFactory()
546
547        data = challenge
548
549        # WHEN
550        actual_challenge = factory.parse(io.BytesIO(data), dict())
551
552        # THEN
553        self.assertTrue(isinstance(actual_challenge, mle.Challenge))
554        self.assertEqual(challenge, actual_challenge.challenge)
555
556
557class TestResponse(unittest.TestCase):
558
559    def test_should_return_response_value_when_response_property_is_called(self):
560        # GIVEN
561        response = any_response()
562
563        response_obj = mle.Response(response)
564
565        # WHEN
566        actual_response = response_obj.response
567
568        # THEN
569        self.assertEqual(response, actual_response)
570
571
572class TestResponseFactory(unittest.TestCase):
573
574    def test_should_create_Challenge_from_bytearray_when_parse_method_is_called(self):
575        # GIVEN
576        response = any_response()
577
578        factory = mle.ResponseFactory()
579
580        data = response
581
582        # WHEN
583        actual_response = factory.parse(io.BytesIO(data), dict())
584
585        # THEN
586        self.assertTrue(isinstance(actual_response, mle.Response))
587        self.assertEqual(response, actual_response.response)
588
589
590class TestLinkLayerFrameCounter(unittest.TestCase):
591
592    def test_should_return_frame_counter_value_when_frame_counter_property_is_called(self):
593        # GIVEN
594        link_layer_frame_counter = any_link_layer_frame_counter()
595
596        link_layer_frame_counter_obj = mle.LinkLayerFrameCounter(link_layer_frame_counter)
597
598        # WHEN
599        actual_link_layer_frame_counter = link_layer_frame_counter_obj.frame_counter
600
601        # THEN
602        self.assertEqual(link_layer_frame_counter, actual_link_layer_frame_counter)
603
604
605class TestLinkLayerFrameCounterFactory(unittest.TestCase):
606
607    def test_should_create_LinkLayerFrameCounter_from_bytearray_when_parse_method_is_called(self):
608        # GIVEN
609        link_layer_frame_counter = any_link_layer_frame_counter()
610
611        factory = mle.LinkLayerFrameCounterFactory()
612
613        data = struct.pack(">I", link_layer_frame_counter)
614
615        # WHEN
616        actual_link_layer_frame_counter = factory.parse(io.BytesIO(data), dict())
617
618        # THEN
619        self.assertTrue(isinstance(actual_link_layer_frame_counter, mle.LinkLayerFrameCounter))
620        self.assertEqual(link_layer_frame_counter, actual_link_layer_frame_counter.frame_counter)
621
622
623class TestMleFrameCounter(unittest.TestCase):
624
625    def test_should_return_frame_counter_value_when_frame_counter_property_is_called(self):
626        # GIVEN
627        mle_frame_counter = any_mle_frame_counter()
628
629        mle_frame_counter_obj = mle.MleFrameCounter(mle_frame_counter)
630
631        # WHEN
632        actual_mle_frame_counter = mle_frame_counter_obj.frame_counter
633
634        # THEN
635        self.assertEqual(mle_frame_counter, actual_mle_frame_counter)
636
637
638class TestMleFrameCounterFactory(unittest.TestCase):
639
640    def test_should_create_MleFrameCounter_from_bytearray_when_parse_method_is_called(self):
641        # GIVEN
642        mle_frame_counter = any_mle_frame_counter()
643
644        factory = mle.MleFrameCounterFactory()
645
646        data = struct.pack(">I", mle_frame_counter)
647
648        # WHEN
649        actual_mle_frame_counter = factory.parse(io.BytesIO(data), dict())
650
651        # THEN
652        self.assertTrue(isinstance(actual_mle_frame_counter, mle.MleFrameCounter))
653        self.assertEqual(mle_frame_counter, actual_mle_frame_counter.frame_counter)
654
655
656class TestLinkQualityAndRouteData(unittest.TestCase):
657
658    def test_should_return_output_value_when_output_property_is_called(self):
659        # GIVEN
660        output = any_output()
661
662        lqrd = mle.LinkQualityAndRouteData(output, any_input(), any_route())
663
664        # WHEN
665        actual_output = lqrd.output
666
667        # THEN
668        self.assertEqual(output, actual_output)
669
670    def test_should_return_input_value_when_input_property_is_called(self):
671        # GIVEN
672        _input = any_input()
673
674        lqrd = mle.LinkQualityAndRouteData(any_output(), _input, any_route())
675
676        # WHEN
677        actual_input = lqrd.input
678
679        # THEN
680        self.assertEqual(_input, actual_input)
681
682    def test_should_return_route_value_when_route_property_is_called(self):
683        # GIVEN
684        route = any_route()
685
686        lqrd = mle.LinkQualityAndRouteData(any_output(), any_input(), route)
687
688        # WHEN
689        actual_route = lqrd.route
690
691        # THEN
692        self.assertEqual(route, actual_route)
693
694
695class TestLinkQualityAndRouteDataFactory(unittest.TestCase):
696
697    def test_should_create_LinkQualityAndRouteData_from_well_known_byte_when_parse_method_is_called(self):
698        # GIVEN
699        factory = mle.LinkQualityAndRouteDataFactory()
700
701        data = bytearray([0x66])
702
703        # WHEN
704        actual_lqrd = factory.parse(io.BytesIO(data), dict())
705
706        # THEN
707        self.assertEqual(1, actual_lqrd.output)
708        self.assertEqual(2, actual_lqrd.input)
709        self.assertEqual(6, actual_lqrd.route)
710
711    def test_should_create_LinkQualityAndRouteData_from_bytearray_when_parse_method_is_called(self):
712        # GIVEN
713        output = any_output()
714        _input = any_input()
715        route = any_route()
716
717        lqrd = (output << 6) | (_input << 4) | route
718
719        factory = mle.LinkQualityAndRouteDataFactory()
720
721        data = bytearray([lqrd])
722
723        # WHEN
724        actual_lqrd = factory.parse(io.BytesIO(data), dict())
725
726        # THEN
727        self.assertTrue(isinstance(actual_lqrd, mle.LinkQualityAndRouteData))
728        self.assertEqual(output, actual_lqrd.output)
729        self.assertEqual(_input, actual_lqrd.input)
730        self.assertEqual(route, actual_lqrd.route)
731
732
733class TestRoute64(unittest.TestCase):
734
735    def test_should_return_id_sequence_value_when_id_sequence_property_is_called(self):
736        # GIVEN
737        id_sequence = any_id_sequence()
738
739        route64_obj = mle.Route64(id_sequence, any_router_id_mask(), any_link_quality_and_route_data())
740
741        # WHEN
742        actual_id_sequence = route64_obj.id_sequence
743
744        # THEN
745        self.assertEqual(id_sequence, actual_id_sequence)
746
747    def test_should_return_router_id_mask_value_when_router_id_mask_property_is_called(self):
748        # GIVEN
749        router_id_mask = any_router_id_mask()
750
751        route64_obj = mle.Route64(any_id_sequence(), router_id_mask, any_link_quality_and_route_data())
752
753        # WHEN
754        actual_router_id_mask = route64_obj.router_id_mask
755
756        # THEN
757        self.assertEqual(router_id_mask, actual_router_id_mask)
758
759    def test_should_return_link_quality_and_route_data_value_when_link_quality_and_route_data_property_is_called(self):
760        # GIVEN
761        link_quality_and_route_data = any_link_quality_and_route_data()
762
763        route64_obj = mle.Route64(any_id_sequence(), any_router_id_mask(), link_quality_and_route_data)
764
765        # WHEN
766        actual_link_quality_and_route_data = route64_obj.link_quality_and_route_data
767
768        # THEN
769        self.assertEqual(link_quality_and_route_data, actual_link_quality_and_route_data)
770
771
772class TestRoute64Factory(unittest.TestCase):
773
774    def test_should_create_Route64_from_bytearray_when_parse_method_is_called(self):
775        # GIVEN
776        class DummyLQRDFactory:
777
778            def parse(self, data, context):
779                return ord(data.read(1))
780
781        id_sequence = any_id_sequence()
782        router_id_mask = any_router_id_mask()
783
784        router_count = 0
785        for i in range(64):
786            router_count += (router_id_mask >> i) & 0x01
787
788        link_quality_and_route_data = any_link_quality_and_route_data(router_count)
789
790        factory = mle.Route64Factory(DummyLQRDFactory())
791
792        data = bytearray([id_sequence]) + struct.pack(">Q", router_id_mask) + bytearray(link_quality_and_route_data)
793
794        # WHEN
795        actual_route64 = factory.parse(io.BytesIO(data), dict())
796
797        # THEN
798        self.assertTrue(isinstance(actual_route64, mle.Route64))
799        self.assertEqual(id_sequence, actual_route64.id_sequence)
800        self.assertEqual(router_id_mask, actual_route64.router_id_mask)
801        self.assertEqual([b for b in link_quality_and_route_data], actual_route64.link_quality_and_route_data)
802
803
804class TestAddress16(unittest.TestCase):
805
806    def test_should_return_address_value_when_address_property_is_called(self):
807        # GIVEN
808        address = any_address()
809
810        address16 = mle.Address16(address)
811
812        # WHEN
813        actual_address = address16.address
814
815        # THEN
816        self.assertEqual(address, actual_address)
817
818
819class TestAddress16Factory(unittest.TestCase):
820
821    def test_should_create_Address16_from_bytearray_when_parse_method_is_called(self):
822        # GIVEN
823        address = any_address()
824
825        factory = mle.Address16Factory()
826
827        data = struct.pack(">H", address)
828
829        # WHEN
830        actual_address16 = factory.parse(io.BytesIO(data), dict())
831
832        # THEN
833        self.assertTrue(isinstance(actual_address16, mle.Address16))
834        self.assertEqual(address, actual_address16.address)
835
836
837class TestLeaderData(unittest.TestCase):
838
839    def test_should_return_partition_id_value_when_partition_id_property_is_called(self):
840        # GIVEN
841        partition_id = any_partition_id()
842
843        leader_data = mle.LeaderData(partition_id, any_weighting(), any_data_version(), any_stable_data_version(),
844                                     any_leader_router_id())
845
846        # WHEN
847        actual_partition_id = leader_data.partition_id
848
849        # THEN
850        self.assertEqual(partition_id, actual_partition_id)
851
852    def test_should_return_weighting_value_when_weighting_property_is_called(self):
853        # GIVEN
854        weighting = any_weighting()
855
856        leader_data = mle.LeaderData(any_partition_id(), weighting, any_data_version(), any_stable_data_version(),
857                                     any_leader_router_id())
858
859        # WHEN
860        actual_weighting = leader_data.weighting
861
862        # THEN
863        self.assertEqual(weighting, actual_weighting)
864
865    def test_should_return_data_version_value_when_data_version_property_is_called(self):
866        # GIVEN
867        data_version = any_data_version()
868
869        leader_data = mle.LeaderData(any_partition_id(), any_weighting(), data_version, any_stable_data_version(),
870                                     any_leader_router_id())
871
872        # WHEN
873        actual_data_version = leader_data.data_version
874
875        # THEN
876        self.assertEqual(data_version, actual_data_version)
877
878    def test_should_return_stable_data_version_value_when_stable_data_version_property_is_called(self):
879        # GIVEN
880        stable_data_version = any_stable_data_version()
881
882        leader_data = mle.LeaderData(any_partition_id(), any_weighting(), any_data_version(), stable_data_version,
883                                     any_leader_router_id())
884
885        # WHEN
886        actual_stable_data_version = leader_data.stable_data_version
887
888        # THEN
889        self.assertEqual(stable_data_version, actual_stable_data_version)
890
891    def test_should_return_leader_router_id_value_when_leader_router_id_property_is_called(self):
892        # GIVEN
893        leader_router_id = any_leader_router_id()
894
895        leader_data = mle.LeaderData(any_partition_id(), any_weighting(), any_data_version(),
896                                     any_stable_data_version(), leader_router_id)
897
898        # WHEN
899        actual_leader_router_id = leader_data.leader_router_id
900
901        # THEN
902        self.assertEqual(leader_router_id, actual_leader_router_id)
903
904
905class TestLeaderDataFactory(unittest.TestCase):
906
907    def test_should_create_Address16_from_bytearray_when_parse_method_is_called(self):
908        # GIVEN
909        partition_id = any_partition_id()
910        weighting = any_weighting()
911        data_version = any_data_version()
912        stable_data_version = any_stable_data_version()
913        leader_router_id = any_leader_router_id()
914
915        factory = mle.LeaderDataFactory()
916
917        data = bytearray(struct.pack(">I", partition_id)) + \
918            bytearray([weighting, data_version, stable_data_version, leader_router_id])
919
920        # WHEN
921        actual_leader_data = factory.parse(io.BytesIO(data), dict())
922
923        # THEN
924        self.assertTrue(isinstance(actual_leader_data, mle.LeaderData))
925        self.assertEqual(partition_id, actual_leader_data.partition_id)
926        self.assertEqual(weighting, actual_leader_data.weighting)
927        self.assertEqual(data_version, actual_leader_data.data_version)
928        self.assertEqual(stable_data_version, actual_leader_data.stable_data_version)
929        self.assertEqual(leader_router_id, actual_leader_data.leader_router_id)
930
931
932class TestNetworkData(unittest.TestCase):
933
934    def test_should_return_tlvs_value_when_tlvs_property_is_called(self):
935        # GIVEN
936        tlvs = any_tlvs()
937
938        network_data = mle.NetworkData(tlvs)
939
940        # WHEN
941        actual_tlvs = network_data.tlvs
942
943        # THEN
944        self.assertEqual(tlvs, actual_tlvs)
945
946
947class TestNetworkDataFactory(unittest.TestCase):
948
949    def test_should_create_TlvRequest_from_bytearray_when_parse_method_is_called(self):
950        # GIVEN
951        class DummyNetworkTlvsFactory:
952
953            def parse(self, data, context):
954                return [b for b in bytearray(data.read())]
955
956        tlvs = any_tlvs()
957
958        factory = mle.NetworkDataFactory(DummyNetworkTlvsFactory())
959
960        data = bytearray(tlvs)
961
962        # WHEN
963        actual_network_data = factory.parse(io.BytesIO(data), dict())
964
965        # THEN
966        self.assertTrue(isinstance(actual_network_data, mle.NetworkData))
967        self.assertEqual(tlvs, actual_network_data.tlvs)
968
969
970class TestTlvRequest(unittest.TestCase):
971
972    def test_should_return_tlvs_value_when_tlvs_property_is_called(self):
973        # GIVEN
974        tlvs = any_tlvs()
975
976        tlv_request = mle.TlvRequest(tlvs)
977
978        # WHEN
979        actual_tlvs = tlv_request.tlvs
980
981        # THEN
982        self.assertEqual(tlvs, actual_tlvs)
983
984
985class TestTlvRequestFactory(unittest.TestCase):
986
987    def test_should_create_TlvRequest_from_bytearray_when_parse_method_is_called(self):
988        # GIVEN
989        tlvs = any_tlvs()
990
991        factory = mle.TlvRequestFactory()
992
993        data = bytearray(tlvs)
994
995        # WHEN
996        actual_tlv_request = factory.parse(io.BytesIO(data), dict())
997
998        # THEN
999        self.assertTrue(isinstance(actual_tlv_request, mle.TlvRequest))
1000        self.assertEqual(tlvs, actual_tlv_request.tlvs)
1001
1002
1003class TestScanMask(unittest.TestCase):
1004
1005    def test_should_return_router_value_when_router_property_is_called(self):
1006        # GIVEN
1007        router = any_scan_mask_router()
1008
1009        scan_mask = mle.ScanMask(router, any_scan_mask_end_device())
1010
1011        # WHEN
1012        actual_router = scan_mask.router
1013
1014        # THEN
1015        self.assertEqual(router, actual_router)
1016
1017    def test_should_return_end_device_value_when_end_device_property_is_called(self):
1018        # GIVEN
1019        end_device = any_scan_mask_end_device()
1020
1021        scan_mask = mle.ScanMask(any_scan_mask_router(), end_device)
1022
1023        # WHEN
1024        actual_end_device = scan_mask.end_device
1025
1026        # THEN
1027        self.assertEqual(end_device, actual_end_device)
1028
1029
1030class TestScanMaskFactory(unittest.TestCase):
1031
1032    def test_should_create_ScanMask_from_bytearray_when_parse_method_is_called(self):
1033        # GIVEN
1034        scan_mask = any_scan_mask()
1035
1036        factory = mle.ScanMaskFactory()
1037
1038        data = bytearray([scan_mask])
1039
1040        # WHEN
1041        actual_scan_mask = factory.parse(io.BytesIO(data), dict())
1042
1043        # THEN
1044        self.assertTrue(isinstance(actual_scan_mask, mle.ScanMask))
1045        self.assertEqual(scan_mask_map[scan_mask]["router"], actual_scan_mask.router)
1046        self.assertEqual(scan_mask_map[scan_mask]["end_device"], actual_scan_mask.end_device)
1047
1048
1049class TestConnectivity(unittest.TestCase):
1050
1051    def test_should_return_pp_value_when_pp_property_is_called(self):
1052        # GIVEN
1053        pp_byte = any_pp()
1054
1055        connectivity_obj = mle.Connectivity(pp_byte, any_link_quality_3(), any_link_quality_2(), any_link_quality_1(),
1056                                            any_leader_cost(), any_id_sequence(), any_active_routers(),
1057                                            any_sed_buffer_size(), any_sed_datagram_count())
1058
1059        # WHEN
1060        actual_pp = connectivity_obj.pp
1061
1062        # THEN
1063        self.assertEqual(common.map_pp(pp_byte), actual_pp)
1064
1065    def test_should_return_link_quality_3_value_when_link_quality_3_property_is_called(self):
1066        # GIVEN
1067        link_quality_3 = any_link_quality_3()
1068
1069        connectivity_obj = mle.Connectivity(any_pp(), link_quality_3, any_link_quality_2(), any_link_quality_1(),
1070                                            any_leader_cost(), any_id_sequence(), any_active_routers(),
1071                                            any_sed_buffer_size(), any_sed_datagram_count())
1072
1073        # WHEN
1074        actual_link_quality_3 = connectivity_obj.link_quality_3
1075
1076        # THEN
1077        self.assertEqual(link_quality_3, actual_link_quality_3)
1078
1079    def test_should_return_link_quality_2_value_when_link_quality_2_property_is_called(self):
1080        # GIVEN
1081        link_quality_2 = any_link_quality_2()
1082
1083        connectivity_obj = mle.Connectivity(any_pp(), any_link_quality_3(), link_quality_2, any_link_quality_1(),
1084                                            any_leader_cost(), any_id_sequence(), any_active_routers(),
1085                                            any_sed_buffer_size(), any_sed_datagram_count())
1086
1087        # WHEN
1088        actual_link_quality_2 = connectivity_obj.link_quality_2
1089
1090        # THEN
1091        self.assertEqual(link_quality_2, actual_link_quality_2)
1092
1093    def test_should_return_link_quality_1_value_when_link_quality_1_property_is_called(self):
1094        # GIVEN
1095        link_quality_1 = any_link_quality_1()
1096
1097        connectivity_obj = mle.Connectivity(any_pp(), any_link_quality_3(), any_link_quality_2(), link_quality_1,
1098                                            any_leader_cost(), any_id_sequence(), any_active_routers(),
1099                                            any_sed_buffer_size(), any_sed_datagram_count())
1100
1101        # WHEN
1102        actual_link_quality_1 = connectivity_obj.link_quality_1
1103
1104        # THEN
1105        self.assertEqual(link_quality_1, actual_link_quality_1)
1106
1107    def test_should_return_leader_cost_value_when_leader_cost_property_is_called(self):
1108        # GIVEN
1109        leader_cost = any_leader_cost()
1110
1111        connectivity_obj = mle.Connectivity(any_pp(), any_link_quality_3(), any_link_quality_2(),
1112                                            any_link_quality_1(), leader_cost, any_id_sequence(), any_active_routers(),
1113                                            any_sed_buffer_size(), any_sed_datagram_count())
1114
1115        # WHEN
1116        actual_leader_cost = connectivity_obj.leader_cost
1117
1118        # THEN
1119        self.assertEqual(leader_cost, actual_leader_cost)
1120
1121    def test_should_return_id_sequence_value_when_id_sequence_property_is_called(self):
1122        # GIVEN
1123        id_sequence = any_id_sequence()
1124
1125        connectivity_obj = mle.Connectivity(any_pp(), any_link_quality_3(), any_link_quality_2(), any_link_quality_1(),
1126                                            any_leader_cost(), id_sequence, any_active_routers(),
1127                                            any_sed_buffer_size(), any_sed_datagram_count())
1128
1129        # WHEN
1130        actual_id_sequence = connectivity_obj.id_sequence
1131
1132        # THEN
1133        self.assertEqual(id_sequence, actual_id_sequence)
1134
1135    def test_should_return_active_routers_value_when_active_routers_property_is_called(self):
1136        # GIVEN
1137        active_routers = any_active_routers()
1138
1139        connectivity_obj = mle.Connectivity(any_pp(), any_link_quality_3(), any_link_quality_2(), any_link_quality_1(),
1140                                            any_leader_cost(), any_id_sequence(), active_routers,
1141                                            any_sed_buffer_size(), any_sed_datagram_count())
1142
1143        # WHEN
1144        actual_active_routers = connectivity_obj.active_routers
1145
1146        # THEN
1147        self.assertEqual(active_routers, actual_active_routers)
1148
1149    def test_should_return_sed_buffer_size_value_when_sed_buffer_size_property_is_called(self):
1150        # GIVEN
1151        sed_buffer_size = any_sed_buffer_size()
1152
1153        connectivity_obj = mle.Connectivity(any_pp(), any_link_quality_3(), any_link_quality_2(), any_link_quality_1(),
1154                                            any_leader_cost(), any_id_sequence(), any_active_routers(),
1155                                            sed_buffer_size, any_sed_datagram_count())
1156
1157        # WHEN
1158        actual_sed_buffer_size = connectivity_obj.sed_buffer_size
1159
1160        # THEN
1161        self.assertEqual(sed_buffer_size, actual_sed_buffer_size)
1162
1163    def test_should_return_sed_datagram_count_value_when_sed_datagram_count_property_is_called(self):
1164        # GIVEN
1165        sed_datagram_count = any_sed_datagram_count()
1166
1167        connectivity_obj = mle.Connectivity(any_pp(), any_link_quality_3(), any_link_quality_2(), any_link_quality_1(),
1168                                            any_leader_cost(), any_id_sequence(), any_active_routers(),
1169                                            any_sed_buffer_size(), sed_datagram_count)
1170
1171        # WHEN
1172        actual_sed_datagram_count = connectivity_obj.sed_datagram_count
1173
1174        # THEN
1175        self.assertEqual(sed_datagram_count, actual_sed_datagram_count)
1176
1177
1178class TestConnectivityFactory(unittest.TestCase):
1179
1180    def test_should_create_Connectivity_from_bytearray_when_parse_method_is_called(self):
1181        # GIVEN
1182        pp_byte = any_pp()
1183        link_quality_3 = any_link_quality_3()
1184        link_quality_2 = any_link_quality_2()
1185        link_quality_1 = any_link_quality_1()
1186        leader_cost = any_leader_cost()
1187        id_sequence = any_id_sequence()
1188        active_routers = any_active_routers()
1189        sed_buffer_size = any_sed_buffer_size()
1190        sed_datagram_count = any_sed_datagram_count()
1191
1192        factory = mle.ConnectivityFactory()
1193
1194        data = bytearray([
1195            pp_byte, link_quality_3, link_quality_2, link_quality_1, leader_cost, id_sequence, active_routers
1196        ]) + struct.pack(">H", sed_buffer_size) + bytearray([sed_datagram_count])
1197
1198        # WHEN
1199        actual_connectivity = factory.parse(io.BytesIO(data), dict())
1200
1201        # THEN
1202        self.assertTrue(isinstance(actual_connectivity, mle.Connectivity))
1203        self.assertEqual(common.map_pp(pp_byte), actual_connectivity.pp)
1204        self.assertEqual(link_quality_3, actual_connectivity.link_quality_3)
1205        self.assertEqual(link_quality_2, actual_connectivity.link_quality_2)
1206        self.assertEqual(link_quality_1, actual_connectivity.link_quality_1)
1207        self.assertEqual(leader_cost, actual_connectivity.leader_cost)
1208        self.assertEqual(id_sequence, actual_connectivity.id_sequence)
1209        self.assertEqual(active_routers, actual_connectivity.active_routers)
1210        self.assertEqual(sed_buffer_size, actual_connectivity.sed_buffer_size)
1211        self.assertEqual(sed_datagram_count, actual_connectivity.sed_datagram_count)
1212
1213    def test_should_create_Connectivity_without_sed_data_when_parse_method_is_called(self):
1214        # GIVEN
1215        pp_byte = any_pp()
1216        link_quality_3 = any_link_quality_3()
1217        link_quality_2 = any_link_quality_2()
1218        link_quality_1 = any_link_quality_1()
1219        leader_cost = any_leader_cost()
1220        id_sequence = any_id_sequence()
1221        active_routers = any_active_routers()
1222        any_sed_buffer_size()
1223        any_sed_datagram_count()
1224
1225        factory = mle.ConnectivityFactory()
1226
1227        data = bytearray(
1228            [pp_byte, link_quality_3, link_quality_2, link_quality_1, leader_cost, id_sequence, active_routers])
1229
1230        # WHEN
1231        actual_connectivity = factory.parse(io.BytesIO(data), dict())
1232
1233        # THEN
1234        self.assertTrue(isinstance(actual_connectivity, mle.Connectivity))
1235        self.assertEqual(common.map_pp(pp_byte), actual_connectivity.pp)
1236        self.assertEqual(link_quality_3, actual_connectivity.link_quality_3)
1237        self.assertEqual(link_quality_2, actual_connectivity.link_quality_2)
1238        self.assertEqual(link_quality_1, actual_connectivity.link_quality_1)
1239        self.assertEqual(leader_cost, actual_connectivity.leader_cost)
1240        self.assertEqual(id_sequence, actual_connectivity.id_sequence)
1241        self.assertEqual(active_routers, actual_connectivity.active_routers)
1242        self.assertEqual(None, actual_connectivity.sed_buffer_size)
1243        self.assertEqual(None, actual_connectivity.sed_datagram_count)
1244
1245
1246class TestLinkMargin(unittest.TestCase):
1247
1248    def test_should_return_link_margin_value_when_link_margin_property_is_called(self):
1249        # GIVEN
1250        link_margin = any_link_margin()
1251
1252        link_margin_obj = mle.LinkMargin(link_margin)
1253
1254        # WHEN
1255        actual_link_margin = link_margin_obj.link_margin
1256
1257        # THEN
1258        self.assertEqual(link_margin, actual_link_margin)
1259
1260
1261class TestLinkMarginFactory(unittest.TestCase):
1262
1263    def test_should_create_LinkMargin_from_bytearray_when_parse_method_is_called(self):
1264        # GIVEN
1265        link_margin = any_link_margin()
1266
1267        factory = mle.LinkMarginFactory()
1268
1269        data = bytearray([link_margin])
1270
1271        # WHEN
1272        actual_link_margin = factory.parse(io.BytesIO(data), dict())
1273
1274        # THEN
1275        self.assertTrue(isinstance(actual_link_margin, mle.LinkMargin))
1276        self.assertEqual(link_margin, actual_link_margin.link_margin)
1277
1278
1279class TestStatus(unittest.TestCase):
1280
1281    def test_should_return_status_value_when_status_property_is_called(self):
1282        # GIVEN
1283        status = any_status()
1284
1285        status_obj = mle.Status(status)
1286
1287        # WHEN
1288        actual_status = status_obj.status
1289
1290        # THEN
1291        self.assertEqual(status, actual_status)
1292
1293
1294class TestStatusFactory(unittest.TestCase):
1295
1296    def test_should_create_Status_from_bytearray_when_parse_method_is_called(self):
1297        # GIVEN
1298        status = any_status()
1299
1300        factory = mle.StatusFactory()
1301
1302        data = bytearray([status])
1303
1304        # WHEN
1305        actual_status = factory.parse(io.BytesIO(data), dict())
1306
1307        # THEN
1308        self.assertTrue(isinstance(actual_status, mle.Status))
1309        self.assertEqual(status, actual_status.status)
1310
1311
1312class TestVersion(unittest.TestCase):
1313
1314    def test_should_return_version_value_when_version_property_is_called(self):
1315        # GIVEN
1316        version = any_version()
1317
1318        version_obj = mle.Version(version)
1319
1320        # WHEN
1321        actual_version = version_obj.version
1322
1323        # THEN
1324        self.assertEqual(version, actual_version)
1325
1326
1327class TestVersionFactory(unittest.TestCase):
1328
1329    def test_should_create_Version_from_bytearray_when_parse_method_is_called(self):
1330        # GIVEN
1331        version = any_version()
1332
1333        factory = mle.VersionFactory()
1334
1335        data = struct.pack(">H", version)
1336
1337        # WHEN
1338        actual_version = factory.parse(io.BytesIO(data), dict())
1339
1340        # THEN
1341        self.assertTrue(isinstance(actual_version, mle.Version))
1342        self.assertEqual(version, actual_version.version)
1343
1344
1345class TestAddressRegistrationFull(unittest.TestCase):
1346
1347    def test_should_return_ipv6_address_value_when_ipv6_address_property_is_called(self):
1348        # GIVEN
1349        ipv6_address = any_ipv6_address()
1350
1351        addr_reg_full_obj = mle.AddressFull(ipv6_address)
1352
1353        # WHEN
1354        actual_ipv6_address = addr_reg_full_obj.ipv6_address
1355
1356        # THEN
1357        self.assertEqual(ipv6_address, actual_ipv6_address)
1358
1359
1360class TestAddressRegistrationFullFactory(unittest.TestCase):
1361
1362    def test_should_create_AddressFull_from_bytearray_when_parse_method_is_called(self):
1363        # GIVEN
1364        ipv6_address = any_ipv6_address()
1365
1366        factory = mle.AddressFullFactory()
1367
1368        data = bytearray([0x00]) + bytearray(ipv6_address)
1369
1370        # WHEN
1371        actual_addr_reg_full = factory.parse(io.BytesIO(data), dict())
1372
1373        # THEN
1374        self.assertTrue(isinstance(actual_addr_reg_full, mle.AddressFull))
1375        self.assertEqual(ipv6_address, actual_addr_reg_full.ipv6_address)
1376
1377
1378class TestAddressRegistrationCompressed(unittest.TestCase):
1379
1380    def test_should_return_cid_value_when_cid_property_is_called(self):
1381        # GIVEN
1382        cid = any_cid()
1383
1384        addr_reg_compressed_obj = mle.AddressCompressed(cid, any_iid())
1385
1386        # WHEN
1387        actual_cid = addr_reg_compressed_obj.cid
1388
1389        # THEN
1390        self.assertEqual(cid, actual_cid)
1391
1392    def test_should_return_cid_value_when_iid_property_is_called(self):
1393        # GIVEN
1394        iid = any_iid()
1395
1396        addr_reg_compressed_obj = mle.AddressCompressed(any_cid(), iid)
1397
1398        # WHEN
1399        actual_iid = addr_reg_compressed_obj.iid
1400
1401        # THEN
1402        self.assertEqual(iid, actual_iid)
1403
1404
1405class TestAddressRegistrationCompressedFactory(unittest.TestCase):
1406
1407    def test_should_create_AddressRegistrationCompressed_from_bytearray_when_parse_method_is_called(self):
1408        # GIVEN
1409        cid = any_cid()
1410        iid = any_iid()
1411
1412        factory = mle.AddressCompressedFactory()
1413
1414        data = bytearray([(1 << 7) | cid]) + iid
1415
1416        # WHEN
1417        actual_addr_reg_compressed = factory.parse(io.BytesIO(data), dict())
1418
1419        # THEN
1420        self.assertTrue(isinstance(actual_addr_reg_compressed, mle.AddressCompressed))
1421        self.assertEqual(cid, actual_addr_reg_compressed.cid)
1422        self.assertEqual(iid, actual_addr_reg_compressed.iid)
1423
1424
1425class TestAddressRegistration(unittest.TestCase):
1426
1427    def test_should_return_addresses_value_when_addresses_property_is_called(self):
1428        # GIVEN
1429        addresses = any_addresses()
1430
1431        addr_reg_obj = mle.AddressRegistration(addresses)
1432
1433        # WHEN
1434        actual_addresses = addr_reg_obj.addresses
1435
1436        # THEN
1437        self.assertEqual(addresses, actual_addresses)
1438
1439
1440class TestAddressRegistrationFactory(unittest.TestCase):
1441
1442    def test_should_create_AddressRegistration_from_bytearray_when_parse_method_is_called(self):
1443        # GIVEN
1444        cid = any_cid()
1445        iid = any_iid()
1446        ipv6_address = any_ipv6_address()
1447
1448        addresses = [mle.AddressCompressed(cid, iid), mle.AddressFull(ipv6_address)]
1449
1450        factory = mle.AddressRegistrationFactory(mle.AddressCompressedFactory(), mle.AddressFullFactory())
1451
1452        data = bytearray([(1 << 7) | cid]) + iid + bytearray([0]) + ipv6_address
1453
1454        # WHEN
1455        actual_addr_reg = factory.parse(io.BytesIO(data), dict())
1456
1457        # THEN
1458        self.assertTrue(isinstance(actual_addr_reg, mle.AddressRegistration))
1459        self.assertEqual(addresses[0].cid, actual_addr_reg.addresses[0].cid)
1460        self.assertEqual(addresses[0].iid, actual_addr_reg.addresses[0].iid)
1461        self.assertEqual(addresses[1].ipv6_address, actual_addr_reg.addresses[1].ipv6_address)
1462
1463
1464class TestChannel(unittest.TestCase):
1465
1466    def test_should_return_channel_page_value_when_channel_page_property_is_called(self):
1467        # GIVEN
1468        channel_page = any_channel_page()
1469
1470        channel_obj = mle.Channel(channel_page, any_channel())
1471
1472        # WHEN
1473        actual_channel_page = channel_obj.channel_page
1474
1475        # THEN
1476        self.assertEqual(channel_page, actual_channel_page)
1477
1478    def test_should_return_channel_value_when_channel_property_is_called(self):
1479        # GIVEN
1480        channel = any_channel()
1481
1482        channel_obj = mle.Channel(any_channel_page(), channel)
1483
1484        # WHEN
1485        actual_channel = channel_obj.channel
1486
1487        # THEN
1488        self.assertEqual(channel, actual_channel)
1489
1490
1491class TestChannelFactory(unittest.TestCase):
1492
1493    def test_should_create_Channel_from_bytearray_when_parse_method_is_called(self):
1494        # GIVEN
1495        channel_page = any_channel_page()
1496        channel = any_channel()
1497
1498        factory = mle.ChannelFactory()
1499
1500        data = bytearray([channel_page]) + struct.pack(">H", channel)
1501
1502        # WHEN
1503        actual_channel = factory.parse(io.BytesIO(data), dict())
1504
1505        # THEN
1506        self.assertTrue(isinstance(actual_channel, mle.Channel))
1507        self.assertEqual(channel_page, actual_channel.channel_page)
1508        self.assertEqual(channel, actual_channel.channel)
1509
1510
1511class TestPanId(unittest.TestCase):
1512
1513    def test_should_return_pan_id_value_when_pan_id_property_is_called(self):
1514        # GIVEN
1515        pan_id = any_pan_id()
1516
1517        pan_id_obj = mle.PanId(pan_id)
1518
1519        # WHEN
1520        actual_pan_id = pan_id_obj.pan_id
1521
1522        # THEN
1523        self.assertEqual(pan_id, actual_pan_id)
1524
1525
1526class TestPanIdFactory(unittest.TestCase):
1527
1528    def test_should_create_PanId_from_bytearray_when_parse_method_is_called(self):
1529        # GIVEN
1530        pan_id = any_pan_id()
1531
1532        factory = mle.PanIdFactory()
1533
1534        data = struct.pack(">H", pan_id)
1535
1536        # WHEN
1537        actual_pan_id = factory.parse(io.BytesIO(data), dict())
1538
1539        # THEN
1540        self.assertTrue(isinstance(actual_pan_id, mle.PanId))
1541        self.assertEqual(pan_id, actual_pan_id.pan_id)
1542
1543
1544class TestActiveTimestamp(unittest.TestCase):
1545
1546    def test_should_return_timestamp_seconds_value_when_timestamp_seconds_property_is_called(self):
1547        # GIVEN
1548        timestamp_seconds = any_timestamp_seconds()
1549
1550        active_timestamp_obj = mle.ActiveTimestamp(timestamp_seconds, any_timestamp_ticks(), any_u())
1551
1552        # WHEN
1553        actual_timestamp_seconds = active_timestamp_obj.timestamp_seconds
1554
1555        # THEN
1556        self.assertEqual(timestamp_seconds, actual_timestamp_seconds)
1557
1558    def test_should_return_timestamp_ticks_value_when_timestamp_ticks_property_is_called(self):
1559        # GIVEN
1560        timestamp_ticks = any_timestamp_ticks()
1561
1562        active_timestamp_obj = mle.ActiveTimestamp(any_timestamp_seconds(), timestamp_ticks, any_u())
1563
1564        # WHEN
1565        actual_timestamp_ticks = active_timestamp_obj.timestamp_ticks
1566
1567        # THEN
1568        self.assertEqual(timestamp_ticks, actual_timestamp_ticks)
1569
1570    def test_should_return_u_value_when_u_property_is_called(self):
1571        # GIVEN
1572        u = any_u()
1573
1574        active_timestamp_obj = mle.ActiveTimestamp(any_timestamp_seconds(), any_timestamp_ticks(), u)
1575
1576        # WHEN
1577        actual_u = active_timestamp_obj.u
1578
1579        # THEN
1580        self.assertEqual(u, actual_u)
1581
1582
1583class TestActiveTimestampFactory(unittest.TestCase):
1584
1585    def test_should_create_ActiveTimestamp_from_bytearray_when_parse_method_is_called(self):
1586        # GIVEN
1587        timestamp_seconds = any_timestamp_seconds()
1588        timestamp_ticks = any_timestamp_ticks()
1589        u = any_u()
1590
1591        factory = mle.ActiveTimestampFactory()
1592
1593        data = struct.pack(">Q", timestamp_seconds)[2:] + struct.pack(">H", (timestamp_ticks << 1) | u)
1594
1595        # WHEN
1596        active_timestamp = factory.parse(io.BytesIO(data), dict())
1597
1598        # THEN
1599        self.assertTrue(isinstance(active_timestamp, mle.ActiveTimestamp))
1600        self.assertEqual(timestamp_seconds, active_timestamp.timestamp_seconds)
1601        self.assertEqual(timestamp_ticks, active_timestamp.timestamp_ticks)
1602        self.assertEqual(u, active_timestamp.u)
1603
1604
1605class TestPendingTimestamp(unittest.TestCase):
1606
1607    def test_should_return_timestamp_seconds_value_when_timestamp_seconds_property_is_called(self):
1608        # GIVEN
1609        timestamp_seconds = any_timestamp_seconds()
1610
1611        pending_timestamp_obj = mle.PendingTimestamp(timestamp_seconds, any_timestamp_ticks(), any_u())
1612
1613        # WHEN
1614        actual_timestamp_seconds = pending_timestamp_obj.timestamp_seconds
1615
1616        # THEN
1617        self.assertEqual(timestamp_seconds, actual_timestamp_seconds)
1618
1619    def test_should_return_timestamp_ticks_value_when_timestamp_ticks_property_is_called(self):
1620        # GIVEN
1621        timestamp_ticks = any_timestamp_ticks()
1622
1623        pending_timestamp_obj = mle.PendingTimestamp(any_timestamp_seconds(), timestamp_ticks, any_u())
1624
1625        # WHEN
1626        actual_timestamp_ticks = pending_timestamp_obj.timestamp_ticks
1627
1628        # THEN
1629        self.assertEqual(timestamp_ticks, actual_timestamp_ticks)
1630
1631    def test_should_return_u_value_when_u_property_is_called(self):
1632        # GIVEN
1633        u = any_u()
1634
1635        pending_timestamp_obj = mle.PendingTimestamp(any_timestamp_seconds(), any_timestamp_ticks(), u)
1636
1637        # WHEN
1638        actual_u = pending_timestamp_obj.u
1639
1640        # THEN
1641        self.assertEqual(u, actual_u)
1642
1643
1644class TestPendingTimestampFactory(unittest.TestCase):
1645
1646    def test_should_create_PendingTimestamp_from_bytearray_when_parse_method_is_called(self):
1647        # GIVEN
1648        timestamp_seconds = any_timestamp_seconds()
1649        timestamp_ticks = any_timestamp_ticks()
1650        u = any_u()
1651
1652        factory = mle.PendingTimestampFactory()
1653
1654        data = struct.pack(">Q", timestamp_seconds)[2:] + struct.pack(">H", (timestamp_ticks << 1) | u)
1655
1656        # WHEN
1657        pending_timestamp = factory.parse(io.BytesIO(data), dict())
1658
1659        # THEN
1660        self.assertTrue(isinstance(pending_timestamp, mle.PendingTimestamp))
1661        self.assertEqual(timestamp_seconds, pending_timestamp.timestamp_seconds)
1662        self.assertEqual(timestamp_ticks, pending_timestamp.timestamp_ticks)
1663        self.assertEqual(u, pending_timestamp.u)
1664
1665
1666class TestMleCommandFactory(unittest.TestCase):
1667
1668    def test_should_create_MleCommand_from_bytearray_when_parse_method_is_called(self):
1669        data = bytearray([
1670            0x0b, 0x04, 0x08, 0xa5, 0xf2, 0x9b, 0xde, 0xe3, 0xd8, 0xbe, 0xb9, 0x05, 0x04, 0x00, 0x00, 0x00, 0x00, 0x08,
1671            0x04, 0x00, 0x00, 0x00, 0x01, 0x01, 0x01, 0x0d, 0x02, 0x04, 0x00, 0x00, 0x00, 0xf0, 0x12, 0x02, 0x00, 0x02,
1672            0x13, 0x09, 0x80, 0x86, 0xa2, 0x1b, 0x81, 0x6d, 0xb8, 0xb5, 0xe8, 0x0d, 0x03, 0x0a, 0x0c, 0x09
1673        ])
1674
1675        factory = mle.MleCommandFactory(config.create_default_mle_tlvs_factories())
1676
1677        # WHEN
1678        actual_mle_command = factory.parse(io.BytesIO(data), None)
1679
1680        # THEN
1681        self.assertTrue(isinstance(actual_mle_command, mle.MleCommand))
1682
1683        self.assertEqual(11, actual_mle_command.type)
1684
1685        self.assertEqual(mle.Response(bytearray([0xa5, 0xf2, 0x9b, 0xde, 0xe3, 0xd8, 0xbe, 0xb9])),
1686                         actual_mle_command.tlvs[0])
1687
1688        self.assertEqual(mle.LinkLayerFrameCounter(0), actual_mle_command.tlvs[1])
1689
1690        self.assertEqual(mle.MleFrameCounter(1), actual_mle_command.tlvs[2])
1691
1692        self.assertEqual(mle.Mode(receiver=1, secure=1, device_type=0, network_data=1), actual_mle_command.tlvs[3])
1693
1694        self.assertEqual(mle.Timeout(240), actual_mle_command.tlvs[4])
1695
1696        self.assertEqual(mle.Version(2), actual_mle_command.tlvs[5])
1697
1698        self.assertEqual(
1699            mle.AddressRegistration(addresses=[
1700                mle.AddressCompressed(cid=0, iid=bytearray([0x86, 0xa2, 0x1b, 0x81, 0x6d, 0xb8, 0xb5, 0xe8]))
1701            ]), actual_mle_command.tlvs[6])
1702
1703        self.assertEqual(mle.TlvRequest(tlvs=[10, 12, 9]), actual_mle_command.tlvs[7])
1704
1705
1706class TestMleMessageFactory(unittest.TestCase):
1707
1708    def test_should_create_MleMessageSecured_from_bytearray_when_parse_method_is_called(self):
1709        # GIVEN
1710        message_info = common.MessageInfo()
1711        message_info.source_ipv6 = "fe80::10cf:d38b:3b61:5558"
1712        message_info.destination_ipv6 = "fe80::383e:9eed:7a01:36a5"
1713
1714        message_info.source_mac_address = common.MacAddress.from_eui64(
1715            bytearray([0x12, 0xcf, 0xd3, 0x8b, 0x3b, 0x61, 0x55, 0x58]))
1716        message_info.destination_mac_address = common.MacAddress.from_eui64(
1717            bytearray([0x3a, 0x3e, 0x9e, 0xed, 0x7a, 0x01, 0x36, 0xa5]))
1718
1719        data = bytearray([
1720            0x00, 0x15, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x14, 0x03, 0xe3, 0x72, 0x50, 0x4f, 0x8c,
1721            0x5c, 0x42, 0x81, 0x68, 0xe2, 0x11, 0xfc, 0xf5, 0x8c, 0x62, 0x8e, 0x83, 0x99, 0xe7, 0x26, 0x86, 0x34, 0x3b,
1722            0xa7, 0x68, 0xc7, 0x93, 0xfb, 0x72, 0xd9, 0xcc, 0x13, 0x5e, 0x5b, 0x96, 0x0e, 0xf1, 0x80, 0x03, 0x55, 0x4f,
1723            0x27, 0xc2, 0x96, 0xf4, 0x9c, 0x65, 0x82, 0x97, 0xcf, 0x97, 0x35, 0x89, 0xc2
1724        ])
1725
1726        factory = config.create_default_mle_message_factory(network_key=config.DEFAULT_NETWORK_KEY)
1727
1728        # WHEN
1729        actual_mle_message = factory.parse(io.BytesIO(data), message_info)
1730
1731        # THEN
1732        self.assertTrue(isinstance(actual_mle_message, mle.MleMessageSecured))
1733
1734        self.assertEqual(11, actual_mle_message.command.type)
1735
1736        self.assertEqual(mle.Response(bytearray([0xa5, 0xf2, 0x9b, 0xde, 0xe3, 0xd8, 0xbe, 0xb9])),
1737                         actual_mle_message.command.tlvs[0])
1738
1739        self.assertEqual(mle.LinkLayerFrameCounter(0), actual_mle_message.command.tlvs[1])
1740
1741        self.assertEqual(mle.MleFrameCounter(1), actual_mle_message.command.tlvs[2])
1742
1743        self.assertEqual(mle.Mode(receiver=1, secure=1, device_type=0, network_data=1),
1744                         actual_mle_message.command.tlvs[3])
1745
1746        self.assertEqual(mle.Timeout(240), actual_mle_message.command.tlvs[4])
1747
1748        self.assertEqual(mle.Version(2), actual_mle_message.command.tlvs[5])
1749
1750        self.assertEqual(
1751            mle.AddressRegistration(addresses=[
1752                mle.AddressCompressed(cid=0, iid=bytearray([0x86, 0xa2, 0x1b, 0x81, 0x6d, 0xb8, 0xb5, 0xe8]))
1753            ]), actual_mle_message.command.tlvs[6])
1754
1755        self.assertEqual(mle.TlvRequest(tlvs=[10, 12, 9]), actual_mle_message.command.tlvs[7])
1756
1757        self.assertEqual(bytearray(data[-4:]), actual_mle_message.mic)
1758
1759    def test_should_create_MleMessageSecured_with_MLE_Data_Response_from_bytearray_when_parse_method_is_called(self):
1760        # GIVEN
1761        message_info = common.MessageInfo()
1762        message_info.source_ipv6 = "fe80::241c:b11b:7b62:caf1"
1763        message_info.destination_ipv6 = "ff02::1"
1764
1765        message_info.source_mac_address = common.MacAddress.from_eui64(
1766            bytearray([0x26, 0x1c, 0xb1, 0x1b, 0x7b, 0x62, 0xca, 0xf1]))
1767        message_info.destination_mac_address = common.MacAddress.from_eui64(
1768            bytearray([0x3a, 0xba, 0xad, 0xca, 0xfe, 0xde, 0xff, 0xa5]))
1769
1770        data = bytearray([
1771            0x00, 0x15, 0x15, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0xca, 0xd3, 0x45, 0xe2, 0x35, 0x1d, 0x00,
1772            0x2d, 0x72, 0x71, 0xb1, 0x19, 0xaf, 0x8b, 0x05, 0xd9, 0x52, 0x74, 0xce, 0xe6, 0x36, 0x53, 0xeb, 0xc6, 0x25,
1773            0x94, 0x01, 0x6d, 0x20, 0xdf, 0x30, 0x82, 0xf8, 0xbb, 0x34, 0x47, 0x42, 0x50, 0xe9, 0x41, 0xa7, 0x33, 0xa5
1774        ])
1775
1776        factory = config.create_default_mle_message_factory(network_key=config.DEFAULT_NETWORK_KEY)
1777
1778        # WHEN
1779        actual_mle_message = factory.parse(io.BytesIO(data), message_info)
1780
1781        # THEN
1782        self.assertTrue(isinstance(actual_mle_message, mle.MleMessageSecured))
1783
1784        self.assertEqual(8, actual_mle_message.command.type)
1785
1786        self.assertEqual(mle.SourceAddress(address=0x9400), actual_mle_message.command.tlvs[0])
1787
1788        self.assertEqual(
1789            mle.LeaderData(partition_id=0x06d014ca,
1790                           weighting=64,
1791                           data_version=131,
1792                           stable_data_version=168,
1793                           leader_router_id=37), actual_mle_message.command.tlvs[1])
1794
1795        self.assertEqual(
1796            mle.NetworkData(tlvs=[
1797                network_data.Prefix(domain_id=0,
1798                                    prefix_length=64,
1799                                    prefix=bytearray([0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34]),
1800                                    sub_tlvs=[
1801                                        network_data.LowpanId(c=1, cid=1, context_length=64, stable=1),
1802                                        network_data.BorderRouter(
1803                                            border_router_16=37888, prf=0, p=1, s=1, d=0, c=0, r=1, o=1, n=0, stable=1)
1804                                    ],
1805                                    stable=1)
1806            ]), actual_mle_message.command.tlvs[2])
1807
1808        self.assertEqual(bytearray(data[-4:]), actual_mle_message.mic)
1809
1810
1811if __name__ == "__main__":
1812    unittest.main()
1813