1"""
2Common test fixtures
3####################
4
5Copyright (c) 2023 Nordic Semiconductor ASA
6
7SPDX-License-Identifier: Apache-2.0
8
9"""
10
11import time
12import logging
13import os
14import binascii
15import random
16import string
17import pytest
18from leshan import Leshan
19
20from twister_harness import Shell
21from twister_harness import DeviceAdapter
22
23LESHAN_IP: str = '192.0.2.2'
24COAP_PORT: int = 5683
25COAPS_PORT: int = 5684
26BOOTSTRAP_COAPS_PORT: int = 5784
27
28logger = logging.getLogger(__name__)
29
30def pytest_addoption(parser):
31    parser.addoption('--leshan_addr', action='store', default=LESHAN_IP)
32    parser.addoption('--leshan_rest_api', action='store', default='http://localhost:8080/api')
33    parser.addoption('--leshan_bootstrap_rest_api', action='store', default='http://localhost:8081/api')
34    parser.addoption('--passwd', action='store', default='')
35
36class Endpoint:
37    def __init__(self, name: str, shell: Shell, registered: bool = False, bootstrap: bool = False):
38        self.name = name
39        self.registered = registered
40        self.bootstrap = bootstrap
41        self.shell = shell
42        self.last_update = 0.0
43
44    def check_update(self):
45        if not self.registered:
46            return
47        if self.last_update < time.time() - 5:
48            self.shell.exec_command('lwm2m update')
49            self.last_update = time.time()
50
51    def __str__(self):
52        return self.name
53
54
55@pytest.fixture(scope='session')
56def leshan(request) -> Leshan:
57    """
58    Fixture that returns a Leshan object for interacting with the Leshan server.
59
60    :return: The Leshan object.
61    :rtype: Leshan
62    """
63    try:
64        return Leshan(request.config.getoption('--leshan_rest_api'))
65    except RuntimeError:
66        pytest.skip('Leshan server not available')
67
68@pytest.fixture(scope='session')
69def leshan_bootstrap(request) -> Leshan:
70    """
71    Fixture that returns a Leshan object for interacting with the Bootstrap Leshan server.
72
73    :return: The Leshan object.
74    :rtype: Leshan
75    """
76    try:
77        return Leshan(request.config.getoption('--leshan_bootstrap_rest_api'))
78    except RuntimeError:
79        pytest.skip('Leshan Bootstrap server not available')
80
81@pytest.fixture(scope='module')
82def helperclient(request) -> object:
83    """
84    Fixture that returns a helper client object for testing.
85
86    :return: The helper client object.
87    :rtype: object
88    """
89    try:
90        from coapthon.client.helperclient import HelperClient
91    except ModuleNotFoundError:
92        pytest.skip('CoAPthon3 package not installed')
93    return HelperClient(server=(request.config.getoption('--leshan_addr'), COAP_PORT))
94
95
96@pytest.fixture(scope='module')
97def endpoint_nosec(request, shell: Shell, dut: DeviceAdapter, leshan: Leshan) -> str:
98    """Fixture that returns an endpoint that starts on no-secure mode"""
99    # Allow engine to start & stop once.
100    time.sleep(2)
101
102    # Generate randon device id and password (PSK key)
103    ep = 'client_' + binascii.b2a_hex(os.urandom(1)).decode()
104
105    #
106    # Registration Interface test cases (using Non-secure mode)
107    #
108    addr = request.config.getoption('--leshan_addr')
109    shell.exec_command(f'lwm2m write 0/0/0 -s coap://{addr}:{COAP_PORT}')
110    shell.exec_command('lwm2m write 0/0/1 -b 0')
111    shell.exec_command('lwm2m write 0/0/2 -u8 3')
112    shell.exec_command(f'lwm2m write 0/0/3 -s {ep}')
113    shell.exec_command('lwm2m create 1/0')
114    shell.exec_command('lwm2m write 0/0/10 -u16 1')
115    shell.exec_command('lwm2m write 1/0/0 -u16 1')
116    shell.exec_command('lwm2m write 1/0/1 -u32 86400')
117    shell.exec_command(f'lwm2m start {ep} -b 0')
118
119    dut.readlines_until(regex='.*Registration Done', timeout=5.0)
120    yield Endpoint(ep, shell)
121
122    # All done
123    shell.exec_command('lwm2m stop')
124    dut.readlines_until(regex=r'.*Deregistration success', timeout=10.0)
125
126@pytest.fixture(scope='module')
127def endpoint_bootstrap(request, shell: Shell, dut: DeviceAdapter, leshan: Leshan, leshan_bootstrap: Leshan) -> str:
128    """Fixture that returns an endpoint that starts the bootstrap."""
129    try:
130        static_passwd = request.config.getoption('--passwd')
131        # Generate randon device id and password (PSK key)
132        ep = 'client_' + binascii.b2a_hex(os.urandom(1)).decode()
133        if static_passwd:
134            bs_passwd = static_passwd
135            passwd = static_passwd
136        else:
137            bs_passwd = ''.join(random.choice(string.ascii_lowercase) for i in range(16))
138            passwd = ''.join(random.choice(string.ascii_lowercase) for i in range(16))
139
140        logger.debug('Endpoint: %s', ep)
141        logger.debug('Boostrap PSK: %s', binascii.b2a_hex(bs_passwd.encode()).decode())
142        logger.debug('PSK: %s', binascii.b2a_hex(passwd.encode()).decode())
143
144        # Create device entries in Leshan and Bootstrap server
145        addr = request.config.getoption('--leshan_addr')
146        leshan_bootstrap.create_bs_device(ep, f'coaps://{addr}:{COAPS_PORT}', bs_passwd, passwd)
147        leshan.create_psk_device(ep, passwd)
148
149        # Allow engine to start & stop once.
150        time.sleep(2)
151
152        # Write bootsrap server information and PSK keys
153        shell.exec_command(f'lwm2m write 0/0/0 -s coaps://{addr}:{BOOTSTRAP_COAPS_PORT}')
154        shell.exec_command('lwm2m write 0/0/1 -b 1')
155        shell.exec_command('lwm2m write 0/0/2 -u8 0')
156        shell.exec_command(f'lwm2m write 0/0/3 -s {ep}')
157        shell.exec_command(f'lwm2m write 0/0/5 -s {bs_passwd}')
158        shell.exec_command(f'lwm2m start {ep} -b 1')
159
160        yield Endpoint(ep, shell)
161
162        shell.exec_command('lwm2m stop')
163        dut.readlines_until(regex=r'.*Deregistration success', timeout=10.0)
164
165    finally:
166        # Remove device and bootstrap information
167        # Leshan does not accept non-secure connection if device information is provided with PSK
168        leshan.delete_device(ep)
169        leshan_bootstrap.delete_bs_device(ep)
170
171@pytest.fixture(scope='module')
172def endpoint_registered(endpoint_bootstrap, dut: DeviceAdapter) -> str:
173    """Fixture that returns an endpoint that is registered."""
174    if not endpoint_bootstrap.registered:
175        dut.readlines_until(regex='.*Registration Done', timeout=5.0)
176        endpoint_bootstrap.bootstrap = True
177        endpoint_bootstrap.registered = True
178    return endpoint_bootstrap
179
180@pytest.fixture(scope='function')
181def endpoint(endpoint_registered) -> str:
182    """Fixture that returns an endpoint that is registered."""
183    endpoint_registered.check_update()
184    return endpoint_registered
185
186@pytest.fixture(scope='module')
187def configuration_C13(endpoint_registered, shell: Shell) -> str:
188    """Fixture that returns an endpoint that has C13 configuration."""
189    shell.exec_command('lwm2m create /16/0')
190    shell.exec_command('lwm2m create /16/0/0/0')
191    shell.exec_command('lwm2m create /16/0/0/1')
192    shell.exec_command('lwm2m create /16/0/0/2')
193    shell.exec_command('lwm2m create /16/0/0/3')
194    shell.exec_command('lwm2m write /16/0/0/0 "Host Device ID #1"')
195    shell.exec_command('lwm2m write /16/0/0/1 "Host Device Manufacturer #1"')
196    shell.exec_command('lwm2m write /16/0/0/2 "Host Device Model #1"')
197    shell.exec_command('lwm2m write /16/0/0/3 "Host Device Software Version #1"')
198    yield endpoint_registered
199    shell.exec_command('lwm2m delete /16/0')
200