1#!/usr/bin/env python3
2#
3#  Copyright (c) 2016, The OpenThread Authors.
4#  All rights reserved.
5#
6#  Redistribution and use in source and binary forms, with or without
7#  modification, are permitted provided that the following conditions are met:
8#  1. Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#  2. Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#  3. Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27#  POSSIBILITY OF SUCH DAMAGE.
28#
29
30import io
31import random
32import string
33import unittest
34
35import coap
36
37
38def any_delta():
39    return random.getrandbits(4)
40
41
42def any_coap_option_type():
43    return random.getrandbits(4)
44
45
46def any_value():
47    return random.getrandbits(8)
48
49
50def any_4bits_value_different_than_13_and_14():
51    value = None
52    while value is None:
53        value = random.getrandbits(4)
54        if value == 13 or value == 14:
55            value = None
56
57    return value
58
59
60def any_4bits_value_lower_or_equal_than_12():
61    value = None
62    while value is None:
63        value = random.getrandbits(4)
64        if value > 12:
65            value = None
66
67    return value
68
69
70def any_bytearray(length):
71    return bytearray([random.getrandbits(8) for _ in range(length)])
72
73
74def any_version():
75    return random.getrandbits(2)
76
77
78def any_type():
79    return random.getrandbits(2)
80
81
82def any_code():
83    return random.getrandbits(8)
84
85
86def any_message_id():
87    return random.getrandbits(16)
88
89
90def any_token():
91    length = random.randint(0, 8)
92    return bytearray([random.getrandbits(8) for _ in range(length)])
93
94
95def any_options():
96    return []
97
98
99def any_payload(length=None):
100    length = length if length is not None else random.randint(0, 64)
101    return bytearray([random.getrandbits(8) for _ in range(length)])
102
103
104def any_uri_path():
105    return "/" + random.choice(string.ascii_lowercase)
106
107
108class TestCoapMessageOptionHeader(unittest.TestCase):
109
110    def test_should_return_passed_on_value_when_read_extended_value_is_called_with_value_different_than_13_and_14(
111            self):
112        # GIVEN
113        value = any_4bits_value_different_than_13_and_14()
114
115        # WHEN
116        actual_value = coap.CoapOptionHeader._read_extended_value(None, value)
117
118        # THEN
119        self.assertEqual(value, actual_value)
120
121    def test_should_return_value_stored_in_first_byte_plus_13_when_read_extended_value_is_called_with_value_equal_13(
122            self):
123        # GIVEN
124        value = 13
125        extended_value = any_value()
126
127        data = io.BytesIO(bytearray([extended_value]))
128
129        # WHEN
130        actual_value = coap.CoapOptionHeader._read_extended_value(data, value)
131
132        # THEN
133        self.assertEqual(extended_value + 13, actual_value)
134
135    def test_should_return_value_stored_in_first_byte_plus_269_when_read_extended_value_is_called_with_value_equal_14(
136            self):
137        # GIVEN
138        value = 14
139        extended_value = any_value()
140
141        data = io.BytesIO(bytearray([any_value(), extended_value]))
142
143        # WHEN
144        actual_value = coap.CoapOptionHeader._read_extended_value(data, value)
145
146        # THEN
147        self.assertEqual(extended_value + 269, actual_value)
148
149    def test_should_create_CoapOptionHeader_when_from_bytes_classmethod_is_called(self):
150        # GIVEN
151        delta = any_4bits_value_different_than_13_and_14()
152        length = any_4bits_value_different_than_13_and_14()
153
154        data = bytearray([delta << 4 | length])
155
156        # WHEN
157        option_header = coap.CoapOptionHeader.from_bytes(io.BytesIO(data))
158
159        # THEN
160        self.assertEqual(delta, option_header.delta)
161        self.assertEqual(length, option_header.length)
162
163    def test_should_return_True_when_is_payload_marker_property_called_with_delta_and_length_equal_15(self):
164        # GIVEN
165        delta = 15
166        length = 15
167
168        data = bytearray([delta << 4 | length])
169
170        # WHEN
171        option_header = coap.CoapOptionHeader.from_bytes(io.BytesIO(data))
172
173        # THEN
174        self.assertTrue(option_header.is_payload_marker)
175
176
177class TestCoapOption(unittest.TestCase):
178
179    def test_should_return_type_value_when_type_property_is_called(self):
180        # GIVEN
181        _type = any_coap_option_type()
182
183        coap_opt = coap.CoapOption(_type, any_value())
184
185        # WHEN
186        actual_type = coap_opt.type
187
188        # THEN
189        self.assertEqual(_type, actual_type)
190
191    def test_should_return_value_value_when_value_property_is_called(self):
192        # GIVEN
193        value = any_value()
194
195        coap_opt = coap.CoapOption(any_coap_option_type(), value)
196
197        # WHEN
198        actual_value = coap_opt.value
199
200        # THEN
201        self.assertEqual(value, actual_value)
202
203
204class TestCoapOptionsFactory(unittest.TestCase):
205
206    def test_should_create_list_of_CoapOption_from_bytearray_when_parse_method_is_called(self):
207        # GIVEN
208        delta = any_4bits_value_lower_or_equal_than_12()
209        length = any_4bits_value_lower_or_equal_than_12()
210        value = any_bytearray(length)
211
212        data = bytearray([delta << 4 | length]) + value
213
214        factory = coap.CoapOptionsFactory()
215
216        # WHEN
217        coap_options = factory.parse(io.BytesIO(data), None)
218
219        # THEN
220        self.assertEqual(1, len(coap_options))
221        self.assertEqual(delta, coap_options[0].type)
222        self.assertEqual(value, coap_options[0].value)
223
224
225class TestCoapCode(unittest.TestCase):
226
227    def test_should_return_code_value_when_code_property_is_called(self):
228        # GIVEN
229        code = any_code()
230
231        code_obj = coap.CoapCode(code)
232
233        # WHEN
234        actual_code = code_obj.code
235
236        # THEN
237        self.assertEqual(code, actual_code)
238
239    def test_should_return_class_value_when_class_property_is_called(self):
240        # GIVEN
241        code = any_code()
242
243        code_obj = coap.CoapCode(code)
244
245        # WHEN
246        actual_class = code_obj._class
247
248        # THEN
249        self.assertEqual((code >> 5) & 0x7, actual_class)
250
251    def test_should_return_detail_value_when_detail_property_is_called(self):
252        # GIVEN
253        code = any_code()
254
255        code_obj = coap.CoapCode(code)
256
257        # WHEN
258        actual_detail = code_obj.detail
259
260        # THEN
261        self.assertEqual(code & 0x1F, actual_detail)
262
263    def test_should_return_dotted_value_when_dotted_property_is_called(self):
264        # GIVEN
265        code = any_code()
266
267        code_obj = coap.CoapCode(code)
268
269        # WHEN
270        actual_dotted = code_obj.dotted
271
272        # THEN
273        _class, detail = actual_dotted.split(".")
274        self.assertEqual(code, (int(_class) << 5) | int(detail))
275
276    def test_should_create_CoapCode_when_from_class_and_detail_classmethod_is_called(self):
277        # GIVEN
278        code = any_code()
279
280        _class = (code >> 5) & 0x7
281        detail = code & 0x1F
282
283        # WHEN
284        actual_coap_obj = coap.CoapCode.from_class_and_detail(_class, detail)
285
286        # THEN
287        self.assertEqual(code, actual_coap_obj.code)
288
289    def test_should_create_CoapCode_when_from_dotted_string_classmethod_is_called(self):
290        # GIVEN
291        code = any_code()
292
293        code_obj = coap.CoapCode(code)
294
295        # WHEN
296        actual_coap_obj = coap.CoapCode.from_dotted(code_obj.dotted)
297
298        # THEN
299        self.assertEqual(code, actual_coap_obj.code)
300
301
302class TestCoapMessage(unittest.TestCase):
303
304    def test_should_return_version_value_when_version_property_is_called(self):
305        # GIVEN
306        version = any_version()
307
308        coap_message = coap.CoapMessage(
309            version,
310            any_type(),
311            any_code(),
312            any_message_id(),
313            any_token(),
314            any_options(),
315            any_payload(),
316        )
317
318        # WHEN
319        actual_version = coap_message.version
320
321        # THEN
322        self.assertEqual(version, actual_version)
323
324    def test_should_return_type_value_when_type_property_is_called(self):
325        # GIVEN
326        _type = any_type()
327
328        coap_message = coap.CoapMessage(
329            any_version(),
330            _type,
331            any_code(),
332            any_message_id(),
333            any_token(),
334            any_options(),
335            any_payload(),
336        )
337
338        # WHEN
339        actual_type = coap_message.type
340
341        # THEN
342        self.assertEqual(_type, actual_type)
343
344    def test_should_return_code_value_when_code_property_is_called(self):
345        # GIVEN
346        code = any_code()
347
348        coap_message = coap.CoapMessage(
349            any_version(),
350            any_type(),
351            code,
352            any_message_id(),
353            any_token(),
354            any_options(),
355            any_payload(),
356        )
357
358        # WHEN
359        actual_code = coap_message.code
360
361        # THEN
362        self.assertEqual(code, actual_code)
363
364    def test_should_return_message_id_value_when_message_id_property_is_called(self):
365        # GIVEN
366        message_id = any_message_id()
367
368        coap_message = coap.CoapMessage(
369            any_version(),
370            any_type(),
371            any_code(),
372            message_id,
373            any_token(),
374            any_options(),
375            any_payload(),
376        )
377
378        # WHEN
379        actual_message_id = coap_message.message_id
380
381        # THEN
382        self.assertEqual(message_id, actual_message_id)
383
384    def test_should_return_token_value_when_token_property_is_called(self):
385        # GIVEN
386        token = any_token()
387
388        coap_message = coap.CoapMessage(
389            any_version(),
390            any_type(),
391            any_code(),
392            any_message_id(),
393            token,
394            any_options(),
395            any_payload(),
396        )
397
398        # WHEN
399        actual_token = coap_message.token
400
401        # THEN
402        self.assertEqual(token, actual_token)
403
404    def test_should_return_tkl_value_when_tkl_property_is_called(self):
405        # GIVEN
406        token = any_token()
407
408        coap_message = coap.CoapMessage(
409            any_version(),
410            any_type(),
411            any_code(),
412            any_message_id(),
413            token,
414            any_options(),
415            any_payload(),
416        )
417
418        # WHEN
419        actual_tkl = coap_message.tkl
420
421        # THEN
422        self.assertEqual(len(token), actual_tkl)
423
424    def test_should_return_options_value_when_options_property_is_called(self):
425        # GIVEN
426        options = any_options()
427
428        coap_message = coap.CoapMessage(
429            any_version(),
430            any_type(),
431            any_code(),
432            any_message_id(),
433            any_token(),
434            options,
435            any_payload(),
436        )
437
438        # WHEN
439        actual_options = coap_message.options
440
441        # THEN
442        self.assertEqual(options, actual_options)
443
444    def test_should_return_payload_value_when_payload_property_is_called(self):
445        # GIVEN
446        payload = any_payload()
447
448        coap_message = coap.CoapMessage(
449            any_version(),
450            any_type(),
451            any_code(),
452            any_message_id(),
453            any_token(),
454            any_options(),
455            payload,
456        )
457
458        # WHEN
459        actual_payload = coap_message.payload
460
461        # THEN
462        self.assertEqual(payload, actual_payload)
463
464    def test_should_return_uri_path_value_when_uri_path_property_is_called(self):
465        # GIVEN
466        uri_path = any_uri_path()
467
468        coap_message = coap.CoapMessage(
469            any_version(),
470            any_type(),
471            any_code(),
472            any_message_id(),
473            any_token(),
474            any_options(),
475            any_payload(),
476            uri_path,
477        )
478
479        # WHEN
480        actual_uri_path = coap_message.uri_path
481
482        # THEN
483        self.assertEqual(uri_path, actual_uri_path)
484
485
486class TestCoapMessageIdToUriPathBinder(unittest.TestCase):
487
488    def test_should_add_uri_path_to_binds_when_add_uri_path_for_method_is_called(self):
489        # GIVEN
490        message_id = any_message_id()
491        token = any_token()
492        uri_path = any_uri_path()
493
494        binder = coap.CoapMessageIdToUriPathBinder()
495
496        # WHEN
497        binder.add_uri_path_for(message_id, token, uri_path)
498
499        # THEN
500        self.assertEqual(uri_path, binder.get_uri_path_for(message_id, token))
501
502    def test_should_raise_KeyError_when_get_uri_path_for_is_called_but_it_is_not_present_in_database(self):
503        # GIVEN
504        message_id = any_message_id()
505        token = any_token()
506        any_uri_path()
507
508        binder = coap.CoapMessageIdToUriPathBinder()
509
510        # THEN
511        self.assertRaises(RuntimeError, binder.get_uri_path_for, message_id, token)
512
513
514class TestCoapMessageFactory(unittest.TestCase):
515
516    def _create_dummy_payload_factory(self):
517
518        class DummyPayloadFactory:
519
520            def parse(self, data, message_info):
521                return data.read()
522
523        return DummyPayloadFactory()
524
525    def _create_coap_message_factory(self):
526        return coap.CoapMessageFactory(
527            options_factory=coap.CoapOptionsFactory(),
528            uri_path_based_payload_factories={"/a/as": self._create_dummy_payload_factory()},
529            message_id_to_uri_path_binder=coap.CoapMessageIdToUriPathBinder(),
530        )
531
532    def test_should_create_CoapMessage_from_solicit_request_data_when_parse_method_is_called(self):
533        # GIVEN
534        data = bytearray([
535            0x42,
536            0x02,
537            0x00,
538            0xBD,
539            0x65,
540            0xee,
541            0xB1,
542            0x61,
543            0x02,
544            0x61,
545            0x73,
546            0xff,
547            0x01,
548            0x08,
549            0x16,
550            0x6E,
551            0x0A,
552            0x00,
553            0x00,
554            0x00,
555            0x00,
556            0x02,
557            0x04,
558            0x01,
559            0x02,
560        ])
561
562        factory = self._create_coap_message_factory()
563
564        # WHEN
565        coap_message = factory.parse(io.BytesIO(data), None)
566
567        # THEN
568        self.assertEqual(1, coap_message.version)
569        self.assertEqual(0, coap_message.type)
570        self.assertEqual(2, coap_message.tkl)
571        self.assertEqual(2, coap_message.code)
572        self.assertEqual(189, coap_message.message_id)
573        self.assertEqual(bytearray([0x65, 0xee]), coap_message.token)
574        self.assertEqual("a", coap_message.options[0].value.decode("utf-8"))
575        self.assertEqual("as", coap_message.options[1].value.decode("utf-8"))
576        self.assertEqual("/a/as", coap_message.uri_path)
577        self.assertEqual(
578            bytearray([
579                0x01,
580                0x08,
581                0x16,
582                0x6E,
583                0x0A,
584                0x00,
585                0x00,
586                0x00,
587                0x00,
588                0x02,
589                0x04,
590                0x01,
591                0x02,
592            ]),
593            coap_message.payload,
594        )
595
596    def test_should_create_CoapMessage_from_solicit_response_data_when_parse_method_is_called(self):
597        # GIVEN
598        data = bytearray([
599            0x62,
600            0x44,
601            0x00,
602            0xBD,
603            0x65,
604            0xee,
605            0xff,
606            0x04,
607            0x01,
608            0x00,
609            0x02,
610            0x02,
611            0x00,
612            0x00,
613            0x07,
614            0x09,
615            0x76,
616            0x80,
617            0x00,
618            0x01,
619            0x00,
620            0x00,
621            0x00,
622            0x00,
623            0x00,
624        ])
625
626        mid_binder = coap.CoapMessageIdToUriPathBinder()
627        mid_binder.add_uri_path_for(189, bytearray([0x65, 0xee]), "/a/as")
628
629        factory = coap.CoapMessageFactory(
630            options_factory=coap.CoapOptionsFactory(),
631            uri_path_based_payload_factories={"/a/as": self._create_dummy_payload_factory()},
632            message_id_to_uri_path_binder=mid_binder,
633        )
634
635        # WHEN
636        coap_message = factory.parse(io.BytesIO(data), None)
637
638        # THEN
639        self.assertEqual(1, coap_message.version)
640        self.assertEqual(2, coap_message.type)
641        self.assertEqual(2, coap_message.tkl)
642        self.assertEqual("2.04", coap_message.code)
643        self.assertEqual(189, coap_message.message_id)
644        self.assertEqual(bytearray([0x65, 0xee]), coap_message.token)
645        self.assertEqual(None, coap_message.uri_path)
646        self.assertEqual(
647            bytearray([
648                0x04,
649                0x01,
650                0x00,
651                0x02,
652                0x02,
653                0x00,
654                0x00,
655                0x07,
656                0x09,
657                0x76,
658                0x80,
659                0x00,
660                0x01,
661                0x00,
662                0x00,
663                0x00,
664                0x00,
665                0x00,
666            ]),
667            coap_message.payload,
668        )
669
670
671if __name__ == "__main__":
672    unittest.main()
673