1#!/usr/bin/env python3 2# 3# Copyright (c) 2016, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28# 29 30import io 31import random 32import string 33import unittest 34 35import coap 36 37 38def any_delta(): 39 return random.getrandbits(4) 40 41 42def any_coap_option_type(): 43 return random.getrandbits(4) 44 45 46def any_value(): 47 return random.getrandbits(8) 48 49 50def any_4bits_value_different_than_13_and_14(): 51 value = None 52 while value is None: 53 value = random.getrandbits(4) 54 if value == 13 or value == 14: 55 value = None 56 57 return value 58 59 60def any_4bits_value_lower_or_equal_than_12(): 61 value = None 62 while value is None: 63 value = random.getrandbits(4) 64 if value > 12: 65 value = None 66 67 return value 68 69 70def any_bytearray(length): 71 return bytearray([random.getrandbits(8) for _ in range(length)]) 72 73 74def any_version(): 75 return random.getrandbits(2) 76 77 78def any_type(): 79 return random.getrandbits(2) 80 81 82def any_code(): 83 return random.getrandbits(8) 84 85 86def any_message_id(): 87 return random.getrandbits(16) 88 89 90def any_token(): 91 length = random.randint(0, 8) 92 return bytearray([random.getrandbits(8) for _ in range(length)]) 93 94 95def any_options(): 96 return [] 97 98 99def any_payload(length=None): 100 length = length if length is not None else random.randint(0, 64) 101 return bytearray([random.getrandbits(8) for _ in range(length)]) 102 103 104def any_uri_path(): 105 return "/" + random.choice(string.ascii_lowercase) 106 107 108class TestCoapMessageOptionHeader(unittest.TestCase): 109 110 def test_should_return_passed_on_value_when_read_extended_value_is_called_with_value_different_than_13_and_14( 111 self): 112 # GIVEN 113 value = any_4bits_value_different_than_13_and_14() 114 115 # WHEN 116 actual_value = coap.CoapOptionHeader._read_extended_value(None, value) 117 118 # THEN 119 self.assertEqual(value, actual_value) 120 121 def test_should_return_value_stored_in_first_byte_plus_13_when_read_extended_value_is_called_with_value_equal_13( 122 self): 123 # GIVEN 124 value = 13 125 extended_value = any_value() 126 127 data = io.BytesIO(bytearray([extended_value])) 128 129 # WHEN 130 actual_value = coap.CoapOptionHeader._read_extended_value(data, value) 131 132 # THEN 133 self.assertEqual(extended_value + 13, actual_value) 134 135 def test_should_return_value_stored_in_first_byte_plus_269_when_read_extended_value_is_called_with_value_equal_14( 136 self): 137 # GIVEN 138 value = 14 139 extended_value = any_value() 140 141 data = io.BytesIO(bytearray([any_value(), extended_value])) 142 143 # WHEN 144 actual_value = coap.CoapOptionHeader._read_extended_value(data, value) 145 146 # THEN 147 self.assertEqual(extended_value + 269, actual_value) 148 149 def test_should_create_CoapOptionHeader_when_from_bytes_classmethod_is_called(self): 150 # GIVEN 151 delta = any_4bits_value_different_than_13_and_14() 152 length = any_4bits_value_different_than_13_and_14() 153 154 data = bytearray([delta << 4 | length]) 155 156 # WHEN 157 option_header = coap.CoapOptionHeader.from_bytes(io.BytesIO(data)) 158 159 # THEN 160 self.assertEqual(delta, option_header.delta) 161 self.assertEqual(length, option_header.length) 162 163 def test_should_return_True_when_is_payload_marker_property_called_with_delta_and_length_equal_15(self): 164 # GIVEN 165 delta = 15 166 length = 15 167 168 data = bytearray([delta << 4 | length]) 169 170 # WHEN 171 option_header = coap.CoapOptionHeader.from_bytes(io.BytesIO(data)) 172 173 # THEN 174 self.assertTrue(option_header.is_payload_marker) 175 176 177class TestCoapOption(unittest.TestCase): 178 179 def test_should_return_type_value_when_type_property_is_called(self): 180 # GIVEN 181 _type = any_coap_option_type() 182 183 coap_opt = coap.CoapOption(_type, any_value()) 184 185 # WHEN 186 actual_type = coap_opt.type 187 188 # THEN 189 self.assertEqual(_type, actual_type) 190 191 def test_should_return_value_value_when_value_property_is_called(self): 192 # GIVEN 193 value = any_value() 194 195 coap_opt = coap.CoapOption(any_coap_option_type(), value) 196 197 # WHEN 198 actual_value = coap_opt.value 199 200 # THEN 201 self.assertEqual(value, actual_value) 202 203 204class TestCoapOptionsFactory(unittest.TestCase): 205 206 def test_should_create_list_of_CoapOption_from_bytearray_when_parse_method_is_called(self): 207 # GIVEN 208 delta = any_4bits_value_lower_or_equal_than_12() 209 length = any_4bits_value_lower_or_equal_than_12() 210 value = any_bytearray(length) 211 212 data = bytearray([delta << 4 | length]) + value 213 214 factory = coap.CoapOptionsFactory() 215 216 # WHEN 217 coap_options = factory.parse(io.BytesIO(data), None) 218 219 # THEN 220 self.assertEqual(1, len(coap_options)) 221 self.assertEqual(delta, coap_options[0].type) 222 self.assertEqual(value, coap_options[0].value) 223 224 225class TestCoapCode(unittest.TestCase): 226 227 def test_should_return_code_value_when_code_property_is_called(self): 228 # GIVEN 229 code = any_code() 230 231 code_obj = coap.CoapCode(code) 232 233 # WHEN 234 actual_code = code_obj.code 235 236 # THEN 237 self.assertEqual(code, actual_code) 238 239 def test_should_return_class_value_when_class_property_is_called(self): 240 # GIVEN 241 code = any_code() 242 243 code_obj = coap.CoapCode(code) 244 245 # WHEN 246 actual_class = code_obj._class 247 248 # THEN 249 self.assertEqual((code >> 5) & 0x7, actual_class) 250 251 def test_should_return_detail_value_when_detail_property_is_called(self): 252 # GIVEN 253 code = any_code() 254 255 code_obj = coap.CoapCode(code) 256 257 # WHEN 258 actual_detail = code_obj.detail 259 260 # THEN 261 self.assertEqual(code & 0x1F, actual_detail) 262 263 def test_should_return_dotted_value_when_dotted_property_is_called(self): 264 # GIVEN 265 code = any_code() 266 267 code_obj = coap.CoapCode(code) 268 269 # WHEN 270 actual_dotted = code_obj.dotted 271 272 # THEN 273 _class, detail = actual_dotted.split(".") 274 self.assertEqual(code, (int(_class) << 5) | int(detail)) 275 276 def test_should_create_CoapCode_when_from_class_and_detail_classmethod_is_called(self): 277 # GIVEN 278 code = any_code() 279 280 _class = (code >> 5) & 0x7 281 detail = code & 0x1F 282 283 # WHEN 284 actual_coap_obj = coap.CoapCode.from_class_and_detail(_class, detail) 285 286 # THEN 287 self.assertEqual(code, actual_coap_obj.code) 288 289 def test_should_create_CoapCode_when_from_dotted_string_classmethod_is_called(self): 290 # GIVEN 291 code = any_code() 292 293 code_obj = coap.CoapCode(code) 294 295 # WHEN 296 actual_coap_obj = coap.CoapCode.from_dotted(code_obj.dotted) 297 298 # THEN 299 self.assertEqual(code, actual_coap_obj.code) 300 301 302class TestCoapMessage(unittest.TestCase): 303 304 def test_should_return_version_value_when_version_property_is_called(self): 305 # GIVEN 306 version = any_version() 307 308 coap_message = coap.CoapMessage( 309 version, 310 any_type(), 311 any_code(), 312 any_message_id(), 313 any_token(), 314 any_options(), 315 any_payload(), 316 ) 317 318 # WHEN 319 actual_version = coap_message.version 320 321 # THEN 322 self.assertEqual(version, actual_version) 323 324 def test_should_return_type_value_when_type_property_is_called(self): 325 # GIVEN 326 _type = any_type() 327 328 coap_message = coap.CoapMessage( 329 any_version(), 330 _type, 331 any_code(), 332 any_message_id(), 333 any_token(), 334 any_options(), 335 any_payload(), 336 ) 337 338 # WHEN 339 actual_type = coap_message.type 340 341 # THEN 342 self.assertEqual(_type, actual_type) 343 344 def test_should_return_code_value_when_code_property_is_called(self): 345 # GIVEN 346 code = any_code() 347 348 coap_message = coap.CoapMessage( 349 any_version(), 350 any_type(), 351 code, 352 any_message_id(), 353 any_token(), 354 any_options(), 355 any_payload(), 356 ) 357 358 # WHEN 359 actual_code = coap_message.code 360 361 # THEN 362 self.assertEqual(code, actual_code) 363 364 def test_should_return_message_id_value_when_message_id_property_is_called(self): 365 # GIVEN 366 message_id = any_message_id() 367 368 coap_message = coap.CoapMessage( 369 any_version(), 370 any_type(), 371 any_code(), 372 message_id, 373 any_token(), 374 any_options(), 375 any_payload(), 376 ) 377 378 # WHEN 379 actual_message_id = coap_message.message_id 380 381 # THEN 382 self.assertEqual(message_id, actual_message_id) 383 384 def test_should_return_token_value_when_token_property_is_called(self): 385 # GIVEN 386 token = any_token() 387 388 coap_message = coap.CoapMessage( 389 any_version(), 390 any_type(), 391 any_code(), 392 any_message_id(), 393 token, 394 any_options(), 395 any_payload(), 396 ) 397 398 # WHEN 399 actual_token = coap_message.token 400 401 # THEN 402 self.assertEqual(token, actual_token) 403 404 def test_should_return_tkl_value_when_tkl_property_is_called(self): 405 # GIVEN 406 token = any_token() 407 408 coap_message = coap.CoapMessage( 409 any_version(), 410 any_type(), 411 any_code(), 412 any_message_id(), 413 token, 414 any_options(), 415 any_payload(), 416 ) 417 418 # WHEN 419 actual_tkl = coap_message.tkl 420 421 # THEN 422 self.assertEqual(len(token), actual_tkl) 423 424 def test_should_return_options_value_when_options_property_is_called(self): 425 # GIVEN 426 options = any_options() 427 428 coap_message = coap.CoapMessage( 429 any_version(), 430 any_type(), 431 any_code(), 432 any_message_id(), 433 any_token(), 434 options, 435 any_payload(), 436 ) 437 438 # WHEN 439 actual_options = coap_message.options 440 441 # THEN 442 self.assertEqual(options, actual_options) 443 444 def test_should_return_payload_value_when_payload_property_is_called(self): 445 # GIVEN 446 payload = any_payload() 447 448 coap_message = coap.CoapMessage( 449 any_version(), 450 any_type(), 451 any_code(), 452 any_message_id(), 453 any_token(), 454 any_options(), 455 payload, 456 ) 457 458 # WHEN 459 actual_payload = coap_message.payload 460 461 # THEN 462 self.assertEqual(payload, actual_payload) 463 464 def test_should_return_uri_path_value_when_uri_path_property_is_called(self): 465 # GIVEN 466 uri_path = any_uri_path() 467 468 coap_message = coap.CoapMessage( 469 any_version(), 470 any_type(), 471 any_code(), 472 any_message_id(), 473 any_token(), 474 any_options(), 475 any_payload(), 476 uri_path, 477 ) 478 479 # WHEN 480 actual_uri_path = coap_message.uri_path 481 482 # THEN 483 self.assertEqual(uri_path, actual_uri_path) 484 485 486class TestCoapMessageIdToUriPathBinder(unittest.TestCase): 487 488 def test_should_add_uri_path_to_binds_when_add_uri_path_for_method_is_called(self): 489 # GIVEN 490 message_id = any_message_id() 491 token = any_token() 492 uri_path = any_uri_path() 493 494 binder = coap.CoapMessageIdToUriPathBinder() 495 496 # WHEN 497 binder.add_uri_path_for(message_id, token, uri_path) 498 499 # THEN 500 self.assertEqual(uri_path, binder.get_uri_path_for(message_id, token)) 501 502 def test_should_raise_KeyError_when_get_uri_path_for_is_called_but_it_is_not_present_in_database(self): 503 # GIVEN 504 message_id = any_message_id() 505 token = any_token() 506 any_uri_path() 507 508 binder = coap.CoapMessageIdToUriPathBinder() 509 510 # THEN 511 self.assertRaises(RuntimeError, binder.get_uri_path_for, message_id, token) 512 513 514class TestCoapMessageFactory(unittest.TestCase): 515 516 def _create_dummy_payload_factory(self): 517 518 class DummyPayloadFactory: 519 520 def parse(self, data, message_info): 521 return data.read() 522 523 return DummyPayloadFactory() 524 525 def _create_coap_message_factory(self): 526 return coap.CoapMessageFactory( 527 options_factory=coap.CoapOptionsFactory(), 528 uri_path_based_payload_factories={"/a/as": self._create_dummy_payload_factory()}, 529 message_id_to_uri_path_binder=coap.CoapMessageIdToUriPathBinder(), 530 ) 531 532 def test_should_create_CoapMessage_from_solicit_request_data_when_parse_method_is_called(self): 533 # GIVEN 534 data = bytearray([ 535 0x42, 536 0x02, 537 0x00, 538 0xBD, 539 0x65, 540 0xee, 541 0xB1, 542 0x61, 543 0x02, 544 0x61, 545 0x73, 546 0xff, 547 0x01, 548 0x08, 549 0x16, 550 0x6E, 551 0x0A, 552 0x00, 553 0x00, 554 0x00, 555 0x00, 556 0x02, 557 0x04, 558 0x01, 559 0x02, 560 ]) 561 562 factory = self._create_coap_message_factory() 563 564 # WHEN 565 coap_message = factory.parse(io.BytesIO(data), None) 566 567 # THEN 568 self.assertEqual(1, coap_message.version) 569 self.assertEqual(0, coap_message.type) 570 self.assertEqual(2, coap_message.tkl) 571 self.assertEqual(2, coap_message.code) 572 self.assertEqual(189, coap_message.message_id) 573 self.assertEqual(bytearray([0x65, 0xee]), coap_message.token) 574 self.assertEqual("a", coap_message.options[0].value.decode("utf-8")) 575 self.assertEqual("as", coap_message.options[1].value.decode("utf-8")) 576 self.assertEqual("/a/as", coap_message.uri_path) 577 self.assertEqual( 578 bytearray([ 579 0x01, 580 0x08, 581 0x16, 582 0x6E, 583 0x0A, 584 0x00, 585 0x00, 586 0x00, 587 0x00, 588 0x02, 589 0x04, 590 0x01, 591 0x02, 592 ]), 593 coap_message.payload, 594 ) 595 596 def test_should_create_CoapMessage_from_solicit_response_data_when_parse_method_is_called(self): 597 # GIVEN 598 data = bytearray([ 599 0x62, 600 0x44, 601 0x00, 602 0xBD, 603 0x65, 604 0xee, 605 0xff, 606 0x04, 607 0x01, 608 0x00, 609 0x02, 610 0x02, 611 0x00, 612 0x00, 613 0x07, 614 0x09, 615 0x76, 616 0x80, 617 0x00, 618 0x01, 619 0x00, 620 0x00, 621 0x00, 622 0x00, 623 0x00, 624 ]) 625 626 mid_binder = coap.CoapMessageIdToUriPathBinder() 627 mid_binder.add_uri_path_for(189, bytearray([0x65, 0xee]), "/a/as") 628 629 factory = coap.CoapMessageFactory( 630 options_factory=coap.CoapOptionsFactory(), 631 uri_path_based_payload_factories={"/a/as": self._create_dummy_payload_factory()}, 632 message_id_to_uri_path_binder=mid_binder, 633 ) 634 635 # WHEN 636 coap_message = factory.parse(io.BytesIO(data), None) 637 638 # THEN 639 self.assertEqual(1, coap_message.version) 640 self.assertEqual(2, coap_message.type) 641 self.assertEqual(2, coap_message.tkl) 642 self.assertEqual("2.04", coap_message.code) 643 self.assertEqual(189, coap_message.message_id) 644 self.assertEqual(bytearray([0x65, 0xee]), coap_message.token) 645 self.assertEqual(None, coap_message.uri_path) 646 self.assertEqual( 647 bytearray([ 648 0x04, 649 0x01, 650 0x00, 651 0x02, 652 0x02, 653 0x00, 654 0x00, 655 0x07, 656 0x09, 657 0x76, 658 0x80, 659 0x00, 660 0x01, 661 0x00, 662 0x00, 663 0x00, 664 0x00, 665 0x00, 666 ]), 667 coap_message.payload, 668 ) 669 670 671if __name__ == "__main__": 672 unittest.main() 673