1""" 2Tests for Block-Wise transfers in LwM2M 3####################################### 4 5Copyright (c) 2024 Nordic Semiconductor ASA 6 7SPDX-License-Identifier: Apache-2.0 8 9 10""" 11 12import binascii 13import logging 14import random 15import re 16import string 17import time 18import zlib 19 20from leshan import Leshan 21from twister_harness import DeviceAdapter, Shell 22 23logger = logging.getLogger(__name__) 24 25def test_blockwise_1(shell: Shell, dut: DeviceAdapter, leshan: Leshan, endpoint: str): 26 """Blockwise test 1: Block-Wise PUT using OPAQUE content format""" 27 28 fw = b'1234567890' * 500 29 fmt = leshan.format 30 to = leshan.timeout 31 leshan.format = 'OPAQUE' 32 leshan.timeout = 600 33 leshan.write(endpoint, '5/0/0', fw) 34 # Our Firmware object prints out the CRC of the received firmware 35 # when Update is executed 36 leshan.execute(endpoint, '5/0/2') 37 lines = dut.readlines_until(regex='app_fw_update: UPDATE', timeout=5.0) 38 assert len(lines) > 0 39 line = lines[-1] 40 crc = int(re.search('CRC ([0-9]+)', line).group(1)) 41 # Verify that CRC matches 42 assert crc == zlib.crc32(fw) 43 leshan.format = fmt 44 leshan.timeout = to 45 46def test_blockwise_2(shell: Shell, dut: DeviceAdapter, leshan: Leshan, endpoint: str): 47 """Blockwise test 2: Block-Wise PUT with retry""" 48 49 fw = b'1234567890' * 500 50 fmt = leshan.format 51 to = leshan.timeout 52 leshan.format = 'OPAQUE' 53 # Set timeout to 1 second to force Leshan to stop sending 54 leshan.timeout = 1 55 try: 56 leshan.write(endpoint, '5/0/0', fw) 57 except Exception as e: 58 logger.debug(f'Caught exception: {e}') 59 shell.exec_command('lwm2m update') 60 time.sleep(1) 61 # Now send the firmware again using longer timeout 62 leshan.timeout = 600 63 leshan.write(endpoint, '5/0/0', fw) 64 # Our Firmware object prints out the CRC of the received firmware 65 # when Update is executed 66 leshan.execute(endpoint, '5/0/2') 67 lines = dut.readlines_until(regex='app_fw_update: UPDATE', timeout=5.0) 68 assert len(lines) > 0 69 line = lines[-1] 70 crc = int(re.search('CRC ([0-9]+)', line).group(1)) 71 # Verify that CRC matches 72 assert crc == zlib.crc32(fw) 73 leshan.format = fmt 74 leshan.timeout = to 75 76 77def test_blockwise_3(shell: Shell, dut: DeviceAdapter, leshan: Leshan, endpoint: str): 78 """Blockwise test 3: Block-Wise Get using TLV and SenML-CBOR content formats""" 79 80 shell.exec_command('lwm2m create /19/0') 81 # Wait for update to finish so server is aware of the /19/0 object 82 dut.readlines_until(regex='.*net_lwm2m_rd_client: Update Done', timeout=5.0) 83 84 # Generate 4 kB of binary app-data 85 # and write it into BinaryAppData object 86 data = ''.join(random.choice(string.ascii_letters) for i in range(4096)).encode() 87 fmt = leshan.format 88 to = leshan.timeout 89 leshan.format = 'OPAQUE' 90 leshan.timeout = 600 91 leshan.write(endpoint, '19/0/0/0', data) 92 93 # Verify data is correctly transferred 94 lines = shell.get_filtered_output(shell.exec_command('lwm2m read /19/0/0/0 -crc32')) 95 crc = int(lines[0]) 96 assert crc == zlib.crc32(data) 97 98 # Try reading the data using different content formats 99 for fmt in ['TLV', 'SENML_CBOR']: 100 leshan.format = fmt 101 data = leshan.read(endpoint, '19/0/0') 102 data = binascii.a2b_hex(data[0][0]) 103 assert crc == zlib.crc32(data) 104 105 leshan.format = fmt 106 leshan.timeout = to 107 shell.exec_command('lwm2m delete /19/0') 108 109def test_blockwise_4(shell: Shell, dut: DeviceAdapter, leshan: Leshan, endpoint: str): 110 """Blockwise test 4: Block-Wise SEND using SenML-CBOR content format""" 111 112 shell.exec_command('lwm2m create /19/0') 113 # Wait for update to finish so server is aware of the /19/0 object 114 dut.readlines_until(regex='.*net_lwm2m_rd_client: Update Done', timeout=5.0) 115 116 # Generate 4 kB of binary app-data 117 data = ''.join(random.choice(string.ascii_letters) for i in range(4096)).encode() 118 fmt = leshan.format 119 to = leshan.timeout 120 leshan.format = 'OPAQUE' 121 leshan.timeout = 600 122 leshan.write(endpoint, '19/0/0/0', data) 123 leshan.format = 'SENML_CBOR' 124 125 # Execute SEND and verify that correct data is received 126 with leshan.get_event_stream(endpoint) as events: 127 shell.exec_command('lwm2m send /19/0') 128 dut.readlines_until(regex=r'.*SEND status: 0', timeout=5.0) 129 send = events.next_event('SEND') 130 assert send is not None 131 content = binascii.a2b_hex(send[19][0][0][0]) 132 assert zlib.crc32(content) == zlib.crc32(data) 133 134 leshan.format = fmt 135 leshan.timeout = to 136 137 shell.exec_command('lwm2m delete /19/0') 138