1#!/usr/bin/env python3
2# Copyright (c) 2023 Intel Corporation
3#
4# SPDX-License-Identifier: Apache-2.0
5"""
6Tests for scl.py functions
7"""
8
9import logging
10import mock
11import os
12import pytest
13import sys
14
15ZEPHYR_BASE = os.getenv("ZEPHYR_BASE")
16sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts/pylib/twister"))
17
18import scl
19
20from contextlib import nullcontext
21from importlib import reload
22from pykwalify.errors import SchemaError
23from yaml.scanner import ScannerError
24
25
26TESTDATA_1 = [
27    (False),
28    (True),
29]
30
31@pytest.mark.parametrize(
32    'fail_c',
33    TESTDATA_1,
34    ids=['C YAML', 'non-C YAML']
35)
36def test_yaml_imports(fail_c):
37    class ImportRaiser:
38        def find_spec(self, fullname, path, target=None):
39            if fullname == 'yaml.CLoader' and fail_c:
40                raise ImportError()
41            if fullname == 'yaml.CSafeLoader' and fail_c:
42                raise ImportError()
43            if fullname == 'yaml.CDumper' and fail_c:
44                raise ImportError()
45
46    modules_mock = sys.modules.copy()
47
48    if hasattr(modules_mock['yaml'], 'CLoader'):
49        del modules_mock['yaml'].CLoader
50        del modules_mock['yaml'].CSafeLoader
51        del modules_mock['yaml'].CDumper
52
53    cloader_mock = mock.Mock()
54    loader_mock = mock.Mock()
55    csafeloader_mock = mock.Mock()
56    safeloader_mock = mock.Mock()
57    cdumper_mock = mock.Mock()
58    dumper_mock = mock.Mock()
59
60    if not fail_c:
61        modules_mock['yaml'].CLoader = cloader_mock
62        modules_mock['yaml'].CSafeLoader = csafeloader_mock
63        modules_mock['yaml'].CDumper = cdumper_mock
64
65    modules_mock['yaml'].Loader = loader_mock
66    modules_mock['yaml'].SafeLoader = safeloader_mock
67    modules_mock['yaml'].Dumper = dumper_mock
68
69    meta_path_mock = sys.meta_path[:]
70    meta_path_mock.insert(0, ImportRaiser())
71
72    with mock.patch.dict('sys.modules', modules_mock, clear=True), \
73         mock.patch('sys.meta_path', meta_path_mock):
74        reload(scl)
75
76    assert sys.modules['scl'].Loader == loader_mock if fail_c else \
77                                        cloader_mock
78
79    assert sys.modules['scl'].SafeLoader == safeloader_mock if fail_c else \
80                                        csafeloader_mock
81
82    assert sys.modules['scl'].Dumper == dumper_mock if fail_c else \
83                                        cdumper_mock
84
85    import yaml
86    reload(yaml)
87
88
89TESTDATA_2 = [
90    (False, logging.CRITICAL, []),
91    (True, None, ['can\'t import pykwalify; won\'t validate YAML']),
92]
93
94@pytest.mark.parametrize(
95    'fail_pykwalify, log_level, expected_logs',
96    TESTDATA_2,
97    ids=['pykwalify OK', 'no pykwalify']
98)
99def test_pykwalify_import(caplog, fail_pykwalify, log_level, expected_logs):
100    class ImportRaiser:
101        def find_spec(self, fullname, path, target=None):
102            if fullname == 'pykwalify.core' and fail_pykwalify:
103                raise ImportError()
104
105    modules_mock = sys.modules.copy()
106    modules_mock['pykwalify'] = None if fail_pykwalify else \
107                                modules_mock['pykwalify']
108
109    meta_path_mock = sys.meta_path[:]
110    meta_path_mock.insert(0, ImportRaiser())
111
112    with mock.patch.dict('sys.modules', modules_mock, clear=True), \
113         mock.patch('sys.meta_path', meta_path_mock):
114        reload(scl)
115
116    if log_level:
117        assert logging.getLogger('pykwalify.core').level == log_level
118
119    assert all([log in caplog.text for log in expected_logs])
120
121    if fail_pykwalify:
122        assert scl._yaml_validate(None, None) is None
123        assert scl._yaml_validate(mock.Mock(), mock.Mock()) is None
124
125    reload(scl)
126
127
128TESTDATA_3 = [
129    (False),
130    (True),
131]
132
133@pytest.mark.parametrize(
134    'fail_parsing',
135    TESTDATA_3,
136    ids=['ok', 'parsing error']
137)
138def test_yaml_load(caplog, fail_parsing):
139    result_mock = mock.Mock()
140
141    def mock_load(*args, **kwargs):
142        if fail_parsing:
143            context_mark = mock.Mock()
144            problem_mark = mock.Mock()
145            type(context_mark).args = mock.PropertyMock(return_value=[])
146            type(context_mark).name = 'dummy context mark'
147            type(context_mark).line = 0
148            type(context_mark).column = 0
149            type(problem_mark).args = mock.PropertyMock(return_value=[])
150            type(problem_mark).name = 'dummy problem mark'
151            type(problem_mark).line = 0
152            type(problem_mark).column = 0
153            raise ScannerError(context='dummy context',
154                               context_mark=context_mark, problem='dummy problem',
155                               problem_mark=problem_mark, note='Dummy note')
156        return result_mock
157
158    filename = 'dummy/file.yaml'
159
160    with mock.patch('yaml.load', side_effect=mock_load), \
161         mock.patch('builtins.open', mock.mock_open()) as mock_file:
162        with pytest.raises(ScannerError) if fail_parsing else nullcontext():
163            result = scl.yaml_load(filename)
164
165    mock_file.assert_called_with('dummy/file.yaml', 'r', encoding='utf-8')
166
167    if not fail_parsing:
168        assert result == result_mock
169    else:
170        assert 'dummy problem mark:0:0: error: dummy problem' \
171               ' (note Dummy note context @dummy context mark:0:0' \
172               ' dummy context)' in caplog.text
173
174
175
176TESTDATA_4 = [
177    (True, False, None),
178    (False, False, SchemaError),
179    (False, True, ScannerError),
180]
181
182@pytest.mark.parametrize(
183    'validate, fail_load, expected_error',
184    TESTDATA_4,
185    ids=['successful validation', 'failed validation', 'failed load']
186)
187def test_yaml_load_verify(validate, fail_load, expected_error):
188    filename = 'dummy/file.yaml'
189    schema_mock = mock.Mock()
190    data_mock = mock.Mock()
191
192    def mock_load(file_name, *args, **kwargs):
193        assert file_name == filename
194        if fail_load:
195            raise ScannerError
196        return data_mock
197
198    def mock_validate(data, schema, *args, **kwargs):
199        assert data == data_mock
200        assert schema == schema_mock
201        if validate:
202            return True
203        raise SchemaError(u'Schema validation failed.')
204
205    with mock.patch('scl.yaml_load', side_effect=mock_load), \
206         mock.patch('scl._yaml_validate', side_effect=mock_validate), \
207         pytest.raises(expected_error) if expected_error else nullcontext():
208        res = scl.yaml_load_verify(filename, schema_mock)
209
210    if validate:
211        assert res == data_mock
212
213
214TESTDATA_5 = [
215    (True, True, None),
216    (True, False, SchemaError),
217    (False, None, None),
218]
219
220@pytest.mark.parametrize(
221    'schema_exists, validate, expected_error',
222    TESTDATA_5,
223    ids=['successful validation', 'failed validation', 'no schema']
224)
225def test_yaml_validate(schema_exists, validate, expected_error):
226    data_mock = mock.Mock()
227    schema_mock = mock.Mock() if schema_exists else None
228
229    def mock_validate(raise_exception, *args, **kwargs):
230        assert raise_exception
231        if validate:
232            return True
233        raise SchemaError(u'Schema validation failed.')
234
235    def mock_core(source_data, schema_data, *args, **kwargs):
236        assert source_data == data_mock
237        assert schema_data == schema_mock
238        return mock.Mock(validate=mock_validate)
239
240    core_mock = mock.Mock(side_effect=mock_core)
241
242    with mock.patch('pykwalify.core.Core', core_mock), \
243         pytest.raises(expected_error) if expected_error else nullcontext():
244        scl._yaml_validate(data_mock, schema_mock)
245
246    if schema_exists:
247        core_mock.assert_called_once()
248    else:
249        core_mock.assert_not_called()
250
251
252def test_yaml_load_empty_file(tmp_path):
253    quarantine_file = tmp_path / 'empty_quarantine.yml'
254    quarantine_file.write_text("# yaml file without data")
255    with pytest.raises(scl.EmptyYamlFileException):
256        scl.yaml_load_verify(quarantine_file, None)
257