1"""
2Tests for Block-Wise transfers in LwM2M
3#######################################
4
5Copyright (c) 2024 Nordic Semiconductor ASA
6
7SPDX-License-Identifier: Apache-2.0
8
9
10"""
11
12import binascii
13import logging
14import random
15import re
16import string
17import time
18import zlib
19
20from leshan import Leshan
21from twister_harness import DeviceAdapter, Shell
22
23logger = logging.getLogger(__name__)
24
25def test_blockwise_1(shell: Shell, dut: DeviceAdapter, leshan: Leshan, endpoint: str):
26    """Blockwise test 1: Block-Wise PUT using OPAQUE content format"""
27
28    fw = b'1234567890' * 500
29    fmt = leshan.format
30    to = leshan.timeout
31    leshan.format = 'OPAQUE'
32    leshan.timeout = 600
33    leshan.write(endpoint, '5/0/0', fw)
34    # Our Firmware object prints out the CRC of the received firmware
35    # when Update is executed
36    leshan.execute(endpoint, '5/0/2')
37    lines = dut.readlines_until(regex='app_fw_update: UPDATE', timeout=5.0)
38    assert len(lines) > 0
39    line = lines[-1]
40    crc = int(re.search('CRC ([0-9]+)', line).group(1))
41    # Verify that CRC matches
42    assert crc == zlib.crc32(fw)
43    leshan.format =  fmt
44    leshan.timeout = to
45
46def test_blockwise_2(shell: Shell, dut: DeviceAdapter, leshan: Leshan, endpoint: str):
47    """Blockwise test 2: Block-Wise PUT with retry"""
48
49    fw = b'1234567890' * 500
50    fmt = leshan.format
51    to = leshan.timeout
52    leshan.format = 'OPAQUE'
53    # Set timeout to 1 second to force Leshan to stop sending
54    leshan.timeout = 1
55    try:
56        leshan.write(endpoint, '5/0/0', fw)
57    except Exception as e:
58        logger.debug(f'Caught exception: {e}')
59        shell.exec_command('lwm2m update')
60        time.sleep(1)
61    # Now send the firmware again using longer timeout
62    leshan.timeout = 600
63    leshan.write(endpoint, '5/0/0', fw)
64    # Our Firmware object prints out the CRC of the received firmware
65    # when Update is executed
66    leshan.execute(endpoint, '5/0/2')
67    lines = dut.readlines_until(regex='app_fw_update: UPDATE', timeout=5.0)
68    assert len(lines) > 0
69    line = lines[-1]
70    crc = int(re.search('CRC ([0-9]+)', line).group(1))
71    # Verify that CRC matches
72    assert crc == zlib.crc32(fw)
73    leshan.format =  fmt
74    leshan.timeout = to
75
76
77def test_blockwise_3(shell: Shell, dut: DeviceAdapter, leshan: Leshan, endpoint: str):
78    """Blockwise test 3: Block-Wise Get using TLV and SenML-CBOR content formats"""
79
80    shell.exec_command('lwm2m create /19/0')
81    # Wait for update to finish so server is aware of the /19/0 object
82    dut.readlines_until(regex='.*net_lwm2m_rd_client: Update Done', timeout=5.0)
83
84    # Generate 4 kB of binary app-data
85    # and write it into BinaryAppData object
86    data = ''.join(random.choice(string.ascii_letters) for i in range(4096)).encode()
87    fmt = leshan.format
88    to = leshan.timeout
89    leshan.format = 'OPAQUE'
90    leshan.timeout = 600
91    leshan.write(endpoint, '19/0/0/0', data)
92
93    # Verify data is correctly transferred
94    lines = shell.get_filtered_output(shell.exec_command('lwm2m read /19/0/0/0 -crc32'))
95    crc = int(lines[0])
96    assert crc == zlib.crc32(data)
97
98    # Try reading the data using different content formats
99    for fmt in ['TLV', 'SENML_CBOR']:
100        leshan.format = fmt
101        data = leshan.read(endpoint, '19/0/0')
102        data = binascii.a2b_hex(data[0][0])
103        assert crc == zlib.crc32(data)
104
105    leshan.format =  fmt
106    leshan.timeout = to
107    shell.exec_command('lwm2m delete /19/0')
108
109def test_blockwise_4(shell: Shell, dut: DeviceAdapter, leshan: Leshan, endpoint: str):
110    """Blockwise test 4: Block-Wise SEND using SenML-CBOR content format"""
111
112    shell.exec_command('lwm2m create /19/0')
113    # Wait for update to finish so server is aware of the /19/0 object
114    dut.readlines_until(regex='.*net_lwm2m_rd_client: Update Done', timeout=5.0)
115
116    # Generate 4 kB of binary app-data
117    data = ''.join(random.choice(string.ascii_letters) for i in range(4096)).encode()
118    fmt = leshan.format
119    to = leshan.timeout
120    leshan.format = 'OPAQUE'
121    leshan.timeout = 600
122    leshan.write(endpoint, '19/0/0/0', data)
123    leshan.format = 'SENML_CBOR'
124
125    # Execute SEND and verify that correct data is received
126    with leshan.get_event_stream(endpoint) as events:
127        shell.exec_command('lwm2m send /19/0')
128        dut.readlines_until(regex=r'.*SEND status: 0', timeout=5.0)
129        send = events.next_event('SEND')
130        assert send is not None
131        content = binascii.a2b_hex(send[19][0][0][0])
132        assert zlib.crc32(content) == zlib.crc32(data)
133
134    leshan.format =  fmt
135    leshan.timeout = to
136
137    shell.exec_command('lwm2m delete /19/0')
138