1#!/usr/bin/env python3
2#
3#  Copyright (c) 2020, The OpenThread Authors.
4#  All rights reserved.
5#
6#  Redistribution and use in source and binary forms, with or without
7#  modification, are permitted provided that the following conditions are met:
8#  1. Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#  2. Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#  3. Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27#  POSSIBILITY OF SUCH DAMAGE.
28#
29
30import unittest
31
32import config
33import copy
34import mle
35import network_diag
36import network_layer
37import thread_cert
38from network_diag import TlvType
39from pktverify.consts import DIAG_GET_QRY_URI, DIAG_GET_ANS_URI, DG_MAC_EXTENDED_ADDRESS_TLV, DG_MAC_ADDRESS_TLV, DG_MODE_TLV, DG_CONNECTIVITY_TLV, DG_ROUTE64_TLV, DG_LEADER_DATA_TLV, DG_NETWORK_DATA_TLV, DG_IPV6_ADDRESS_LIST_TLV, DG_CHANNEL_PAGES_TLV, DG_TYPE_LIST_TLV, DG_MAC_COUNTERS_TLV, DG_TIMEOUT_TLV, DG_CHILD_TABLE_TLV, REALM_LOCAL_All_THREAD_NODES_MULTICAST_ADDRESS
40from pktverify.packet_verifier import PacketVerifier
41from pktverify.utils import colon_hex
42
43LEADER = 1
44ROUTER1 = 2
45SED1 = 3
46MED1 = 4
47FED1 = 5
48
49MTDS = [MED1, SED1]
50
51# Test Purpose and Description:
52# -----------------------------
53# The purpose of this test case is to verify functionality of commands
54# Diagnostic_Get.query and Diagnostic_Get.ans. Thread Diagnostic commands
55# MUST be supported by FTDs.
56#
57# Test Topology:
58# -------------
59#        Leader
60#          |
61#  FED - Router
62#        /    \
63#      MED    SED
64#
65# DUT Types:
66# ----------
67#  Router
68#  FED
69
70
71class Cert_5_7_03_CoapDiagCommands_Base(thread_cert.TestCase):
72    USE_MESSAGE_FACTORY = False
73    SUPPORT_NCP = False
74
75    TOPOLOGY = {
76        LEADER: {
77            'name': 'LEADER',
78            'mode': 'rdn',
79            'allowlist': [ROUTER1],
80        },
81        ROUTER1: {
82            'mode': 'rdn',
83            'allowlist': [LEADER, SED1, MED1, FED1],
84        },
85        SED1: {
86            'name': 'SED',
87            'is_mtd': True,
88            'mode': '-',
89            'allowlist': [ROUTER1],
90            'timeout': config.DEFAULT_CHILD_TIMEOUT
91        },
92        MED1: {
93            'name': 'MED',
94            'is_mtd': True,
95            'mode': 'rn',
96            'allowlist': [ROUTER1]
97        },
98        FED1: {
99            'allowlist': [ROUTER1],
100            'router_upgrade_threshold': 0
101        },
102    }
103
104    def test(self):
105        # 1 - Form topology
106        self.nodes[LEADER].start()
107        self.simulator.go(5)
108        self.assertEqual(self.nodes[LEADER].get_state(), 'leader')
109
110        self.nodes[ROUTER1].start()
111        self.simulator.go(5)
112        self.assertEqual(self.nodes[ROUTER1].get_state(), 'router')
113
114        for i in range(3, 6):
115            self.nodes[i].start()
116            self.simulator.go(10)
117            self.assertEqual(self.nodes[i].get_state(), 'child')
118
119        self.simulator.go(config.MAX_ADVERTISEMENT_INTERVAL)
120
121        self.collect_rlocs()
122        self.collect_rloc16s()
123
124        tlv_types = [
125            TlvType.EXT_ADDRESS, TlvType.ADDRESS16, TlvType.MODE, TlvType.CONNECTIVITY, TlvType.ROUTE64,
126            TlvType.LEADER_DATA, TlvType.NETWORK_DATA, TlvType.IPV6_ADDRESS_LIST, TlvType.CHILD_TABLE,
127            TlvType.CHANNEL_PAGES
128        ]
129        if self.TOPOLOGY[FED1]['name'] == 'DUT':
130            tlv_types = [
131                TlvType.EXT_ADDRESS, TlvType.ADDRESS16, TlvType.MODE, TlvType.LEADER_DATA, TlvType.NETWORK_DATA,
132                TlvType.IPV6_ADDRESS_LIST, TlvType.CHANNEL_PAGES
133            ]
134
135        # 2 - Leader sends DIAG_GET.query
136        self.nodes[LEADER].send_network_diag_get(REALM_LOCAL_All_THREAD_NODES_MULTICAST_ADDRESS, tlv_types)
137        self.simulator.go(2)
138
139    def verify(self, pv):
140        pkts = pv.pkts
141        pv.summary.show()
142
143        LEADER = pv.vars['LEADER']
144        LEADER_RLOC = pv.vars['LEADER_RLOC']
145        DUT = pv.vars['DUT']
146        DUT_RLOC16 = pv.vars['DUT_RLOC16']
147        SED = pv.vars['SED']
148
149        dut_addr16 = "%04x" % DUT_RLOC16
150
151        # Step 1: Ensure topology is formed correctly
152        if self.TOPOLOGY[ROUTER1]['name'] == 'DUT':
153            FED = pv.vars['FED']
154            pv.verify_attached('DUT', 'LEADER')
155            pv.verify_attached('SED', 'DUT', 'MTD')
156            pv.verify_attached('MED', 'DUT', 'MTD')
157            pv.verify_attached('FED', 'DUT', 'FTD-ED')
158        else:
159            ROUTER = pv.vars['ROUTER']
160            pv.verify_attached('ROUTER', 'LEADER')
161            pv.verify_attached('SED', 'ROUTER', 'MTD')
162            pv.verify_attached('MED', 'ROUTER', 'MTD')
163            pv.verify_attached('DUT', 'ROUTER', 'FTD-ED')
164
165        # Step 2: Leader sends DIAG_GET.qry to the Realm-Local All-Thread-Nodes
166        #         multicast address containing the requested diagnostic TLVs:
167        #         CoAP Response Code
168        #             2.04 Changed
169        #         CoAP Payload
170        #             TLV Type 0 - MAC Extended Address (64- bit)
171        #             TLV Type 1 - MAC Address (16-bit)
172        #             TLV Type 2 - Mode (Capability information)
173        #             TLV Type 6 – Leader Data
174        #             TLV Type 7 – Network Data
175        #             TLV Type 8 – IPv6 address list
176        #             TLV Type 17 – Channel Pagesi
177        #
178        #         if DUT is Router, contianing the following as well:
179        #             TLV Type 4 – Connectivity
180        #             TLV Type 5 – Route64
181        #             TLV Type 16 – Child Table
182        _qr_pkt = pkts.filter_wpan_src64(LEADER).\
183            filter_ipv6_dst(REALM_LOCAL_All_THREAD_NODES_MULTICAST_ADDRESS).\
184            filter_coap_request(DIAG_GET_QRY_URI).\
185            must_next()
186        dut_payload_tlvs = {
187            DG_TYPE_LIST_TLV, DG_MAC_EXTENDED_ADDRESS_TLV, DG_MAC_ADDRESS_TLV, DG_MODE_TLV, DG_LEADER_DATA_TLV,
188            DG_NETWORK_DATA_TLV, DG_IPV6_ADDRESS_LIST_TLV, DG_CHANNEL_PAGES_TLV
189        }
190        if self.TOPOLOGY[ROUTER1]['name'] == 'DUT':
191            dut_payload_tlvs.update({DG_CONNECTIVITY_TLV, DG_ROUTE64_TLV, DG_CHILD_TABLE_TLV})
192            _qr_pkt.must_verify(lambda p: dut_payload_tlvs == set(p.thread_diagnostic.tlv.type))
193
194            # Step 3: The DUT automatically responds with a DIAG_GET.ans response
195            #         MUST contain the requested diagnostic TLVs:
196            #             TLV Type 0 - MAC Extended Address (64- bit)
197            #             TLV Type 1 - MAC Address (16-bit)
198            #             TLV Type 2 - Mode (Capability information)
199            #             TLV Type 4 – Connectivity
200            #             TLV Type 5 – Route64
201            #             TLV Type 6 – Leader Data
202            #             TLV Type 7 – Network Data
203            #             TLV Type 8 – IPv6 address list
204            #             TLV Type 16 – Child Table
205            #             TLV Type 17 – Channel Pages
206            dut_payload_tlvs.remove(DG_TYPE_LIST_TLV)
207            pkts.filter_wpan_src64(DUT).\
208                filter_ipv6_dst(LEADER_RLOC).\
209                filter_coap_request(DIAG_GET_ANS_URI).\
210                filter(lambda p:
211                       dut_payload_tlvs == set(p.thread_diagnostic.tlv.type) and\
212                       {str(p.wpan.src64), colon_hex(dut_addr16, 2), '0f'}
213                       < set(p.thread_diagnostic.tlv.general)
214                      ).\
215                must_next()
216
217            # Step 4: The DUT MUST use IEEE 802.15.4 indirect transmissions to forward
218            #         the DIAG_GET.query to SED
219            dut_payload_tlvs.add(DG_TYPE_LIST_TLV)
220            pkts.filter_wpan_src64(DUT).\
221                filter_ipv6_dst(REALM_LOCAL_All_THREAD_NODES_MULTICAST_ADDRESS).\
222                filter_coap_request(DIAG_GET_QRY_URI).\
223                filter(lambda p:
224                       dut_payload_tlvs == set(p.thread_diagnostic.tlv.type)
225                      ).\
226                must_next()
227
228        else:
229            # Step 5: The DUT automatically responds with a DIAG_GET.ans response
230            #         MUST contain the requested diagnostic TLVs:
231            #             TLV Type 0 - MAC Extended Address (64- bit)
232            #             TLV Type 1 - MAC Address (16-bit)
233            #             TLV Type 2 - Mode (Capability information)
234            #             TLV Type 6 – Leader Data
235            #             TLV Type 7 – Network Data
236            #             TLV Type 8 – IPv6 address list
237            #             TLV Type 17 – Channel Pages
238            dut_payload_tlvs.remove(DG_TYPE_LIST_TLV)
239            pkts.filter_wpan_src64(DUT).\
240                filter_ipv6_dst(LEADER_RLOC).\
241                filter_coap_request(DIAG_GET_ANS_URI).\
242                filter(lambda p:
243                       dut_payload_tlvs == set(p.thread_diagnostic.tlv.type) and\
244                       {str(p.wpan.src64), colon_hex(dut_addr16, 2), '0f'}
245                       < set(p.thread_diagnostic.tlv.general)
246                      ).\
247                must_next()
248
249
250class Cert_5_7_03_CoapDiagCommands_Base_ROUTER(Cert_5_7_03_CoapDiagCommands_Base):
251    TOPOLOGY = copy.deepcopy(Cert_5_7_03_CoapDiagCommands_Base.TOPOLOGY)
252    TOPOLOGY[ROUTER1]['name'] = 'DUT'
253    TOPOLOGY[FED1]['name'] = 'FED'
254
255
256class Cert_5_7_03_CoapDiagCommands_Base_FED(Cert_5_7_03_CoapDiagCommands_Base):
257    TOPOLOGY = copy.deepcopy(Cert_5_7_03_CoapDiagCommands_Base.TOPOLOGY)
258    TOPOLOGY[ROUTER1]['name'] = 'ROUTER'
259    TOPOLOGY[FED1]['name'] = 'DUT'
260
261
262del (Cert_5_7_03_CoapDiagCommands_Base)
263
264if __name__ == '__main__':
265    unittest.main()
266