1# vim: set syntax=python ts=4 :
2#
3# Copyright (c) 2018-2022 Intel Corporation
4# SPDX-License-Identifier: Apache-2.0
5
6import os
7from pathlib import Path
8import re
9import logging
10import contextlib
11import mmap
12import glob
13from typing import List
14from twisterlib.mixins import DisablePyTestCollectionMixin
15from twisterlib.environment import canonical_zephyr_base
16from twisterlib.error import TwisterException, TwisterRuntimeError
17
18logger = logging.getLogger('twister')
19logger.setLevel(logging.DEBUG)
20
21class ScanPathResult:
22    """Result of the scan_tesuite_path function call.
23
24    Attributes:
25        matches                          A list of test cases
26        warnings                         A string containing one or more
27                                         warnings to display
28        has_registered_test_suites       Whether or not the path contained any
29                                         calls to the ztest_register_test_suite
30                                         macro.
31        has_run_registered_test_suites   Whether or not the path contained at
32                                         least one call to
33                                         ztest_run_registered_test_suites.
34        has_test_main                    Whether or not the path contains a
35                                         definition of test_main(void)
36        ztest_suite_names                Names of found ztest suites
37    """
38    def __init__(self,
39                 matches: List[str] = None,
40                 warnings: str = None,
41                 has_registered_test_suites: bool = False,
42                 has_run_registered_test_suites: bool = False,
43                 has_test_main: bool = False,
44                 ztest_suite_names: List[str] = []):
45        self.matches = matches
46        self.warnings = warnings
47        self.has_registered_test_suites = has_registered_test_suites
48        self.has_run_registered_test_suites = has_run_registered_test_suites
49        self.has_test_main = has_test_main
50        self.ztest_suite_names = ztest_suite_names
51
52    def __eq__(self, other):
53        if not isinstance(other, ScanPathResult):
54            return False
55        return (sorted(self.matches) == sorted(other.matches) and
56                self.warnings == other.warnings and
57                (self.has_registered_test_suites ==
58                 other.has_registered_test_suites) and
59                (self.has_run_registered_test_suites ==
60                 other.has_run_registered_test_suites) and
61                self.has_test_main == other.has_test_main and
62                (sorted(self.ztest_suite_names) ==
63                 sorted(other.ztest_suite_names)))
64
65def scan_file(inf_name):
66    regular_suite_regex = re.compile(
67        # do not match until end-of-line, otherwise we won't allow
68        # stc_regex below to catch the ones that are declared in the same
69        # line--as we only search starting the end of this match
70        br"^\s*ztest_test_suite\(\s*(?P<suite_name>[a-zA-Z0-9_]+)\s*,",
71        re.MULTILINE)
72    registered_suite_regex = re.compile(
73        br"^\s*ztest_register_test_suite"
74        br"\(\s*(?P<suite_name>[a-zA-Z0-9_]+)\s*,",
75        re.MULTILINE)
76    new_suite_regex = re.compile(
77        br"^\s*ZTEST_SUITE\(\s*(?P<suite_name>[a-zA-Z0-9_]+)\s*,",
78        re.MULTILINE)
79    testcase_regex = re.compile(
80        br"^\s*(?:ZTEST|ZTEST_F|ZTEST_USER|ZTEST_USER_F)\(\s*(?P<suite_name>[a-zA-Z0-9_]+)\s*,"
81        br"\s*(?P<testcase_name>[a-zA-Z0-9_]+)\s*",
82        re.MULTILINE)
83    # Checks if the file contains a definition of "void test_main(void)"
84    # Since ztest provides a plain test_main implementation it is OK to:
85    # 1. register test suites and not call the run function iff the test
86    #    doesn't have a custom test_main.
87    # 2. register test suites and a custom test_main definition iff the test
88    #    also calls ztest_run_registered_test_suites.
89    test_main_regex = re.compile(
90        br"^\s*void\s+test_main\(void\)",
91        re.MULTILINE)
92    registered_suite_run_regex = re.compile(
93        br"^\s*ztest_run_registered_test_suites\("
94        br"(\*+|&)?(?P<state_identifier>[a-zA-Z0-9_]+)\)",
95        re.MULTILINE)
96
97    warnings = None
98    has_registered_test_suites = False
99    has_run_registered_test_suites = False
100    has_test_main = False
101
102    with open(inf_name) as inf:
103        if os.name == 'nt':
104            mmap_args = {'fileno': inf.fileno(), 'length': 0, 'access': mmap.ACCESS_READ}
105        else:
106            mmap_args = {'fileno': inf.fileno(), 'length': 0, 'flags': mmap.MAP_PRIVATE, 'prot': mmap.PROT_READ,
107                            'offset': 0}
108
109        with contextlib.closing(mmap.mmap(**mmap_args)) as main_c:
110            regular_suite_regex_matches = \
111                [m for m in regular_suite_regex.finditer(main_c)]
112            registered_suite_regex_matches = \
113                [m for m in registered_suite_regex.finditer(main_c)]
114            new_suite_testcase_regex_matches = \
115                [m for m in testcase_regex.finditer(main_c)]
116            new_suite_regex_matches = \
117                [m for m in new_suite_regex.finditer(main_c)]
118
119            if registered_suite_regex_matches:
120                has_registered_test_suites = True
121            if registered_suite_run_regex.search(main_c):
122                has_run_registered_test_suites = True
123            if test_main_regex.search(main_c):
124                has_test_main = True
125
126            if regular_suite_regex_matches:
127                ztest_suite_names = \
128                    _extract_ztest_suite_names(regular_suite_regex_matches)
129                testcase_names, warnings = \
130                    _find_regular_ztest_testcases(main_c, regular_suite_regex_matches, has_registered_test_suites)
131            elif registered_suite_regex_matches:
132                ztest_suite_names = \
133                    _extract_ztest_suite_names(registered_suite_regex_matches)
134                testcase_names, warnings = \
135                    _find_regular_ztest_testcases(main_c, registered_suite_regex_matches, has_registered_test_suites)
136            elif new_suite_regex_matches or new_suite_testcase_regex_matches:
137                ztest_suite_names = \
138                    _extract_ztest_suite_names(new_suite_regex_matches)
139                testcase_names, warnings = \
140                    _find_new_ztest_testcases(main_c)
141            else:
142                # can't find ztest_test_suite, maybe a client, because
143                # it includes ztest.h
144                ztest_suite_names = []
145                testcase_names, warnings = None, None
146
147            return ScanPathResult(
148                matches=testcase_names,
149                warnings=warnings,
150                has_registered_test_suites=has_registered_test_suites,
151                has_run_registered_test_suites=has_run_registered_test_suites,
152                has_test_main=has_test_main,
153                ztest_suite_names=ztest_suite_names)
154
155def _extract_ztest_suite_names(suite_regex_matches):
156    ztest_suite_names = \
157        [m.group("suite_name") for m in suite_regex_matches]
158    ztest_suite_names = \
159        [name.decode("UTF-8") for name in ztest_suite_names]
160    return ztest_suite_names
161
162def _find_regular_ztest_testcases(search_area, suite_regex_matches, is_registered_test_suite):
163    """
164    Find regular ztest testcases like "ztest_unit_test" or similar. Return
165    testcases' names and eventually found warnings.
166    """
167    testcase_regex = re.compile(
168        br"""^\s*  # empty space at the beginning is ok
169        # catch the case where it is declared in the same sentence, e.g:
170        #
171        # ztest_test_suite(mutex_complex, ztest_user_unit_test(TESTNAME));
172        # ztest_register_test_suite(n, p, ztest_user_unit_test(TESTNAME),
173        (?:ztest_
174            (?:test_suite\(|register_test_suite\([a-zA-Z0-9_]+\s*,\s*)
175            [a-zA-Z0-9_]+\s*,\s*
176        )?
177        # Catch ztest[_user]_unit_test-[_setup_teardown](TESTNAME)
178        ztest_(?:1cpu_)?(?:user_)?unit_test(?:_setup_teardown)?
179        # Consume the argument that becomes the extra testcase
180        \(\s*(?P<testcase_name>[a-zA-Z0-9_]+)
181        # _setup_teardown() variant has two extra arguments that we ignore
182        (?:\s*,\s*[a-zA-Z0-9_]+\s*,\s*[a-zA-Z0-9_]+)?
183        \s*\)""",
184        # We don't check how it finishes; we don't care
185        re.MULTILINE | re.VERBOSE)
186    achtung_regex = re.compile(
187        br"(#ifdef|#endif)",
188        re.MULTILINE)
189
190    search_start, search_end = \
191        _get_search_area_boundary(search_area, suite_regex_matches, is_registered_test_suite)
192    limited_search_area = search_area[search_start:search_end]
193    testcase_names, warnings = \
194        _find_ztest_testcases(limited_search_area, testcase_regex)
195
196    achtung_matches = re.findall(achtung_regex, limited_search_area)
197    if achtung_matches and warnings is None:
198        achtung = ", ".join(sorted({match.decode() for match in achtung_matches},reverse = True))
199        warnings = f"found invalid {achtung} in ztest_test_suite()"
200
201    return testcase_names, warnings
202
203
204def _get_search_area_boundary(search_area, suite_regex_matches, is_registered_test_suite):
205    """
206    Get search area boundary based on "ztest_test_suite(...)",
207    "ztest_register_test_suite(...)" or "ztest_run_test_suite(...)"
208    functions occurrence.
209    """
210    suite_run_regex = re.compile(
211        br"^\s*ztest_run_test_suite\((?P<suite_name>[a-zA-Z0-9_]+)\)",
212        re.MULTILINE)
213
214    search_start = suite_regex_matches[0].end()
215
216    suite_run_match = suite_run_regex.search(search_area)
217    if suite_run_match:
218        search_end = suite_run_match.start()
219    elif not suite_run_match and not is_registered_test_suite:
220        raise ValueError("can't find ztest_run_test_suite")
221    else:
222        search_end = re.compile(br"\);", re.MULTILINE) \
223            .search(search_area, search_start) \
224            .end()
225
226    return search_start, search_end
227
228def _find_new_ztest_testcases(search_area):
229    """
230    Find regular ztest testcases like "ZTEST", "ZTEST_F" etc. Return
231    testcases' names and eventually found warnings.
232    """
233    testcase_regex = re.compile(
234        br"^\s*(?:ZTEST|ZTEST_F|ZTEST_USER|ZTEST_USER_F)\(\s*(?P<suite_name>[a-zA-Z0-9_]+)\s*,"
235        br"\s*(?P<testcase_name>[a-zA-Z0-9_]+)\s*",
236        re.MULTILINE)
237
238    return _find_ztest_testcases(search_area, testcase_regex)
239
240def _find_ztest_testcases(search_area, testcase_regex):
241    """
242    Parse search area and try to find testcases defined in testcase_regex
243    argument. Return testcase names and eventually found warnings.
244    """
245    testcase_regex_matches = \
246        [m for m in testcase_regex.finditer(search_area)]
247    testcase_names = \
248        [m.group("testcase_name") for m in testcase_regex_matches]
249    testcase_names = [name.decode("UTF-8") for name in testcase_names]
250    warnings = None
251    for testcase_name in testcase_names:
252        if not testcase_name.startswith("test_"):
253            warnings = "Found a test that does not start with test_"
254    testcase_names = \
255        [tc_name.replace("test_", "", 1) for tc_name in testcase_names]
256
257    return testcase_names, warnings
258
259def find_c_files_in(path: str, extensions: list = ['c', 'cpp', 'cxx', 'cc']) -> list:
260    """
261    Find C or C++ sources in the directory specified by "path"
262    """
263    if not os.path.isdir(path):
264        return []
265
266    # back up previous CWD
267    oldpwd = os.getcwd()
268    os.chdir(path)
269
270    filenames = []
271    for ext in extensions:
272        # glob.glob('**/*.c') does not pick up the base directory
273        filenames += [os.path.join(path, x) for x in glob.glob(f'*.{ext}')]
274        # glob matches in subdirectories too
275        filenames += [os.path.join(path, x) for x in glob.glob(f'**/*.{ext}')]
276
277    # restore previous CWD
278    os.chdir(oldpwd)
279
280    return filenames
281
282def scan_testsuite_path(testsuite_path):
283    subcases = []
284    has_registered_test_suites = False
285    has_run_registered_test_suites = False
286    has_test_main = False
287    ztest_suite_names = []
288
289    src_dir_path = _find_src_dir_path(testsuite_path)
290    for filename in find_c_files_in(src_dir_path):
291        if os.stat(filename).st_size == 0:
292            continue
293        try:
294            result: ScanPathResult = scan_file(filename)
295            if result.warnings:
296                logger.error("%s: %s" % (filename, result.warnings))
297                raise TwisterRuntimeError(
298                    "%s: %s" % (filename, result.warnings))
299            if result.matches:
300                subcases += result.matches
301            if result.has_registered_test_suites:
302                has_registered_test_suites = True
303            if result.has_run_registered_test_suites:
304                has_run_registered_test_suites = True
305            if result.has_test_main:
306                has_test_main = True
307            if result.ztest_suite_names:
308                ztest_suite_names += result.ztest_suite_names
309
310        except ValueError as e:
311            logger.error("%s: error parsing source file: %s" % (filename, e))
312
313    for filename in find_c_files_in(testsuite_path):
314        try:
315            result: ScanPathResult = scan_file(filename)
316            if result.warnings:
317                logger.error("%s: %s" % (filename, result.warnings))
318            if result.matches:
319                subcases += result.matches
320            if result.ztest_suite_names:
321                ztest_suite_names += result.ztest_suite_names
322        except ValueError as e:
323            logger.error("%s: can't find: %s" % (filename, e))
324
325    if (has_registered_test_suites and has_test_main and
326            not has_run_registered_test_suites):
327        warning = \
328            "Found call to 'ztest_register_test_suite()' but no "\
329            "call to 'ztest_run_registered_test_suites()'"
330        logger.error(warning)
331        raise TwisterRuntimeError(warning)
332
333    return subcases, ztest_suite_names
334
335def _find_src_dir_path(test_dir_path):
336    """
337    Try to find src directory with test source code. Sometimes due to the
338    optimization reasons it is placed in upper directory.
339    """
340    src_dir_name = "src"
341    src_dir_path = os.path.join(test_dir_path, src_dir_name)
342    if os.path.isdir(src_dir_path):
343        return src_dir_path
344    src_dir_path = os.path.join(test_dir_path, "..", src_dir_name)
345    if os.path.isdir(src_dir_path):
346        return src_dir_path
347    return ""
348
349class TestCase(DisablePyTestCollectionMixin):
350
351    def __init__(self, name=None, testsuite=None):
352        self.duration = 0
353        self.name = name
354        self.status = None
355        self.reason = None
356        self.testsuite = testsuite
357        self.output = ""
358        self.freeform = False
359
360    def __lt__(self, other):
361        return self.name < other.name
362
363    def __repr__(self):
364        return "<TestCase %s with %s>" % (self.name, self.status)
365
366    def __str__(self):
367        return self.name
368
369class TestSuite(DisablePyTestCollectionMixin):
370    """Class representing a test application
371    """
372
373    def __init__(self, suite_root, suite_path, name, data=None, detailed_test_id=True):
374        """TestSuite constructor.
375
376        This gets called by TestPlan as it finds and reads test yaml files.
377        Multiple TestSuite instances may be generated from a single testcase.yaml,
378        each one corresponds to an entry within that file.
379
380        We need to have a unique name for every single test case. Since
381        a testcase.yaml can define multiple tests, the canonical name for
382        the test case is <workdir>/<name>.
383
384        @param testsuite_root os.path.abspath() of one of the --testsuite-root
385        @param suite_path path to testsuite
386        @param name Name of this test case, corresponding to the entry name
387            in the test case configuration file. For many test cases that just
388            define one test, can be anything and is usually "test". This is
389            really only used to distinguish between different cases when
390            the testcase.yaml defines multiple tests
391        """
392
393        workdir = os.path.relpath(suite_path, suite_root)
394
395        assert self.check_suite_name(name, suite_root, workdir)
396        self.detailed_test_id = detailed_test_id
397        self.name = self.get_unique(suite_root, workdir, name) if self.detailed_test_id else name
398        self.id = name
399
400        self.source_dir = suite_path
401        self.source_dir_rel = os.path.relpath(os.path.realpath(suite_path), start=canonical_zephyr_base)
402        self.yamlfile = suite_path
403        self.testcases = []
404
405        self.ztest_suite_names = []
406
407        if data:
408            self.load(data)
409
410
411    def load(self, data):
412        for k, v in data.items():
413            if k != "testcases":
414                setattr(self, k, v)
415
416        if self.harness == 'console' and not self.harness_config:
417            raise Exception('Harness config error: console harness defined without a configuration.')
418
419    def add_subcases(self, data, parsed_subcases, suite_names):
420        testcases = data.get("testcases", [])
421        if testcases:
422            for tc in testcases:
423                self.add_testcase(name=f"{self.id}.{tc}")
424        else:
425            # only add each testcase once
426            for sub in set(parsed_subcases):
427                name = "{}.{}".format(self.id, sub)
428                self.add_testcase(name)
429
430            if not parsed_subcases:
431                self.add_testcase(self.id, freeform=True)
432
433        self.ztest_suite_names = suite_names
434
435    def add_testcase(self, name, freeform=False):
436        tc = TestCase(name=name, testsuite=self)
437        tc.freeform = freeform
438        self.testcases.append(tc)
439
440    @staticmethod
441    def get_unique(testsuite_root, workdir, name):
442
443        canonical_testsuite_root = os.path.realpath(testsuite_root)
444        if Path(canonical_zephyr_base) in Path(canonical_testsuite_root).parents:
445            # This is in ZEPHYR_BASE, so include path in name for uniqueness
446            # FIXME: We should not depend on path of test for unique names.
447            relative_ts_root = os.path.relpath(canonical_testsuite_root,
448                                               start=canonical_zephyr_base)
449        else:
450            relative_ts_root = ""
451
452        # workdir can be "."
453        unique = os.path.normpath(os.path.join(relative_ts_root, workdir, name))
454        return unique
455
456    @staticmethod
457    def check_suite_name(name, testsuite_root, workdir):
458        check = name.split(".")
459        if len(check) < 2:
460            raise TwisterException(f"""bad test name '{name}' in {testsuite_root}/{workdir}. \
461Tests should reference the category and subsystem with a dot as a separator.
462                    """
463                    )
464        return True
465