1#!/usr/bin/env python3
2#
3#  Copyright (c) 2021, The OpenThread Authors.
4#  All rights reserved.
5#
6#  Redistribution and use in source and binary forms, with or without
7#  modification, are permitted provided that the following conditions are met:
8#  1. Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#  2. Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#  3. Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27#  POSSIBILITY OF SUCH DAMAGE.
28#
29
30import unittest
31
32import config
33import thread_cert
34
35# Test description:
36#   This test verifies the SRP client lease changing works as expected.
37#
38# Topology:
39#     LEADER (SRP server)
40#       |
41#       |
42#     ROUTER (SRP client)
43#
44from pktverify.packet_filter import PacketFilter
45
46SERVER = 1
47CLIENT = 2
48
49DEFAULT_LEASE_TIME = 7200
50LEASE_TIME = 60
51NEW_LEASE_TIME = 120
52KEY_LEASE_TIME = 240
53NEW_TTL = 10
54
55assert LEASE_TIME < KEY_LEASE_TIME
56assert NEW_LEASE_TIME < KEY_LEASE_TIME
57assert NEW_TTL < NEW_LEASE_TIME
58
59
60class SrpClientChangeLeaseTime(thread_cert.TestCase):
61    USE_MESSAGE_FACTORY = False
62    SUPPORT_NCP = False
63
64    TOPOLOGY = {
65        SERVER: {
66            'name': 'SRP_SERVER',
67            'networkkey': '00112233445566778899aabbccddeeff',
68            'mode': 'rdn',
69        },
70        CLIENT: {
71            'name': 'SRP_CLIENT',
72            'networkkey': '00112233445566778899aabbccddeeff',
73            'mode': 'rdn',
74        },
75    }
76
77    def test(self):
78        server = self.nodes[SERVER]
79        client = self.nodes[CLIENT]
80
81        #
82        # 0. Start the server and client devices.
83        #
84
85        server.srp_server_set_enabled(True)
86        server.srp_server_set_lease_range(LEASE_TIME, LEASE_TIME, KEY_LEASE_TIME, KEY_LEASE_TIME)
87        server.start()
88        self.simulator.go(config.LEADER_STARTUP_DELAY)
89        self.assertEqual(server.get_state(), 'leader')
90        self.simulator.go(5)
91
92        client.start()
93        self.simulator.go(config.ROUTER_STARTUP_DELAY)
94        self.assertEqual(client.get_state(), 'router')
95
96        #
97        # 1. Register a single service and verify that it works.
98        #
99
100        self.assertEqual(client.srp_client_get_auto_start_mode(), 'Enabled')
101
102        client.srp_client_set_host_name('my-host')
103        client.srp_client_set_host_address('2001::1')
104        client.srp_client_add_service('my-service', '_ipps._tcp', 12345)
105        self.simulator.go(2)
106
107        self.check_host_and_service(server, client)
108
109        server.srp_server_set_lease_range(NEW_LEASE_TIME, NEW_LEASE_TIME, KEY_LEASE_TIME, KEY_LEASE_TIME)
110        client.srp_client_set_lease_interval(NEW_LEASE_TIME)
111        self.simulator.go(NEW_LEASE_TIME)
112        self.assertEqual(client.srp_client_get_host_state(), 'Registered')
113
114        self.simulator.go(KEY_LEASE_TIME * 2)
115        self.assertEqual(client.srp_client_get_host_state(), 'Registered')
116
117        client.srp_client_set_ttl(NEW_TTL)
118        self.simulator.go(NEW_LEASE_TIME)
119        self.assertEqual(client.srp_client_get_host_state(), 'Registered')
120
121        client.srp_client_set_ttl(0)
122        self.simulator.go(NEW_LEASE_TIME)
123        self.assertEqual(client.srp_client_get_host_state(), 'Registered')
124
125    def verify(self, pv):
126        pkts: PacketFilter = pv.pkts
127        pv.summary.show()
128
129        CLIENT_SRC64 = pv.vars['SRP_CLIENT']
130
131        pkts.filter_wpan_src64(CLIENT_SRC64).filter('dns.flags.response == 0 and dns.resp.ttl == {DEFAULT_LEASE_TIME}',
132                                                    DEFAULT_LEASE_TIME=DEFAULT_LEASE_TIME).must_next()
133        pkts.filter_wpan_src64(CLIENT_SRC64).filter('dns.flags.response == 0 and dns.resp.ttl == {NEW_LEASE_TIME}',
134                                                    NEW_LEASE_TIME=NEW_LEASE_TIME).must_next()
135        pkts.filter_wpan_src64(CLIENT_SRC64).filter('dns.flags.response == 0 and dns.resp.ttl == {NEW_TTL}',
136                                                    NEW_TTL=NEW_TTL).must_next()
137        pkts.filter_wpan_src64(CLIENT_SRC64).filter('dns.flags.response == 0 and dns.resp.ttl == {NEW_LEASE_TIME}',
138                                                    NEW_LEASE_TIME=NEW_LEASE_TIME).must_next()
139
140    def check_host_and_service(self, server, client):
141        """Check that we have properly registered host and service instance.
142        """
143
144        client_services = client.srp_client_get_services()
145        print(client_services)
146        self.assertEqual(len(client_services), 1)
147        client_service = client_services[0]
148
149        # Verify that the client possesses correct service resources.
150        self.assertEqual(client_service['instance'], 'my-service')
151        self.assertEqual(client_service['name'], '_ipps._tcp')
152        self.assertEqual(int(client_service['port']), 12345)
153        self.assertEqual(int(client_service['priority']), 0)
154        self.assertEqual(int(client_service['weight']), 0)
155
156        # Verify that the client received a SUCCESS response for the server.
157        self.assertEqual(client_service['state'], 'Registered')
158
159
160if __name__ == '__main__':
161    unittest.main()
162