1# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http:#www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15""" IDF Test Applications """
16import hashlib
17import json
18import os
19import re
20import subprocess
21import sys
22from abc import abstractmethod
23
24from tiny_test_fw import App
25
26from .IDFAssignTest import ComponentUTGroup, ExampleGroup, IDFCaseGroup, TestAppsGroup, UnitTestGroup
27
28try:
29    import gitlab_api
30except ImportError:
31    gitlab_api = None
32
33try:
34    from typing import Any, Dict, List, Optional, Tuple, Type  # noqa: F401
35except ImportError:
36    pass
37
38
39def parse_encrypted_flag(args, offs, binary):  # type: (Dict, str, str) -> Any
40    # Find partition entries (e.g. the entries with an offset and a file)
41    for _, entry in args.items():
42        # If the current entry is a partition, we have to check whether it is
43        # the one we are looking for or not
44        try:
45            if (entry['offset'], entry['file']) == (offs, binary):
46                return entry['encrypted'] == 'true'
47        except (TypeError, KeyError):
48            # TypeError occurs if the entry is a list, which is possible in JSON
49            # data structure.
50            # KeyError occurs if the entry doesn't have "encrypted" field.
51            continue
52
53    # The entry was not found, return None. The caller will have to check
54    # CONFIG_SECURE_FLASH_ENCRYPTION_MODE_DEVELOPMENT macro
55    return None
56
57
58def parse_flash_settings(path, default_encryption=False):  # type: (str, bool) -> Tuple[List[Tuple[str, str]], List[Tuple[str, str]], Dict, Any]
59    file_name = os.path.basename(path)
60
61    # For compatibility reasons, this list contains all the files to be
62    # flashed
63    flash_files = []
64    # The following list only contains the files that need encryption
65    encrypt_files = []
66
67    if file_name == 'flasher_args.json':
68        # CMake version using build metadata file
69        with open(path, 'r') as f:
70            args = json.load(f)
71
72        for (offs, binary) in args['flash_files'].items():
73            if offs:
74                flash_files.append((offs, binary))
75                encrypted = parse_encrypted_flag(args, offs, binary)
76
77                # default_encryption should be taken into account if and only if
78                # encrypted flag is not provided in the JSON file.
79                if (encrypted is None and default_encryption) or encrypted:
80                    encrypt_files.append((offs, binary))
81
82        flash_settings = args['flash_settings']
83        app_name = os.path.splitext(args['app']['file'])[0]
84    else:
85        # GNU Make version uses download.config arguments file
86        with open(path, 'r') as f:
87            args = f.readlines()[-1].split(' ')
88            flash_settings = {}
89            for idx in range(0, len(args), 2):  # process arguments in pairs
90                if args[idx].startswith('--'):
91                    # strip the -- from the command line argument
92                    flash_settings[args[idx][2:]] = args[idx + 1]
93                else:
94                    # offs, filename
95                    flash_files.append((args[idx], args[idx + 1]))
96            # Parameter default_encryption tells us if the files need encryption
97            if default_encryption:
98                encrypt_files = flash_files
99            # we can only guess app name in download.config.
100            for p in flash_files:
101                if not os.path.dirname(p[1]) and 'partition' not in p[1]:
102                    # app bin usually in the same dir with download.config and it's not partition table
103                    app_name = os.path.splitext(p[1])[0]
104                    break
105            else:
106                app_name = None
107    return flash_files, encrypt_files, flash_settings, app_name
108
109
110class Artifacts(object):
111    def __init__(self, dest_root_path, artifact_index_file, app_path, config_name, target):
112        # type: (str, str, str, str, str) -> None
113        assert gitlab_api
114        # at least one of app_path or config_name is not None. otherwise we can't match artifact
115        assert app_path or config_name
116        assert os.path.exists(artifact_index_file)
117        self.gitlab_inst = gitlab_api.Gitlab(os.getenv('CI_PROJECT_ID'))
118        self.dest_root_path = dest_root_path
119        with open(artifact_index_file, 'r') as f:
120            artifact_index = json.load(f)
121        self.artifact_info = self._find_artifact(artifact_index, app_path, config_name, target)
122
123    @staticmethod
124    def _find_artifact(artifact_index, app_path, config_name, target):  # type: ignore
125        for artifact_info in artifact_index:
126            match_result = True
127            if app_path:
128                # We use endswith here to avoid issue like:
129                # examples_protocols_mqtt_ws but return a examples_protocols_mqtt_wss failure
130                match_result = artifact_info['app_dir'].endswith(app_path)
131            if config_name:
132                match_result = match_result and config_name == artifact_info['config']
133            if target:
134                match_result = match_result and target == artifact_info['target']
135            if match_result:
136                ret = artifact_info
137                break
138        else:
139            ret = None
140        return ret
141
142    def _get_app_base_path(self):  # type: () -> Any
143        if self.artifact_info:
144            return os.path.join(self.artifact_info['work_dir'], self.artifact_info['build_dir'])
145        else:
146            return None
147
148    def _get_flash_arg_file(self, base_path, job_id):  # type: (str, str) -> str
149        if self.artifact_info['build_system'] == 'cmake':
150            flash_arg_file = os.path.join(base_path, 'flasher_args.json')
151        else:
152            flash_arg_file = os.path.join(base_path, 'download.config')
153
154        self.gitlab_inst.download_artifact(job_id, [flash_arg_file], self.dest_root_path)
155        return flash_arg_file
156
157    def _download_binary_files(self, base_path, job_id, flash_arg_file):  # type: (str, str, str) -> None
158        # Let's ignore the second value returned (encrypt_files) as these
159        # files also appear in the first list
160        flash_files, _, _, app_name = parse_flash_settings(os.path.join(self.dest_root_path, flash_arg_file))
161        artifact_files = [os.path.join(base_path, p[1]) for p in flash_files]
162        artifact_files.append(os.path.join(base_path, app_name + '.elf'))
163
164        bootloader_path = os.path.join(base_path, 'bootloader', 'bootloader.bin')
165        if bootloader_path not in artifact_files:
166            artifact_files.append(bootloader_path)
167
168        self.gitlab_inst.download_artifact(job_id, artifact_files, self.dest_root_path)
169
170    def _download_sdkconfig_file(self, base_path, job_id):  # type: (str, str) -> None
171        self.gitlab_inst.download_artifact(job_id, [os.path.join(os.path.dirname(base_path), 'sdkconfig')],
172                                           self.dest_root_path)
173
174    def download_artifacts(self):  # type: () -> Any
175        if not self.artifact_info:
176            return None
177        base_path = self._get_app_base_path()
178        job_id = self.artifact_info['ci_job_id']
179        # 1. download flash args file
180        flash_arg_file = self._get_flash_arg_file(base_path, job_id)
181
182        # 2. download all binary files
183        self._download_binary_files(base_path, job_id, flash_arg_file)
184
185        # 3. download sdkconfig file
186        self._download_sdkconfig_file(base_path, job_id)
187        return base_path
188
189    def download_artifact_files(self, file_names):  # type: (List[str]) -> Any
190        if self.artifact_info:
191            base_path = os.path.join(self.artifact_info['work_dir'], self.artifact_info['build_dir'])
192            job_id = self.artifact_info['ci_job_id']
193
194            # download all binary files
195            artifact_files = [os.path.join(base_path, fn) for fn in file_names]
196            self.gitlab_inst.download_artifact(job_id, artifact_files, self.dest_root_path)
197
198            # download sdkconfig file
199            self.gitlab_inst.download_artifact(job_id, [os.path.join(os.path.dirname(base_path), 'sdkconfig')],
200                                               self.dest_root_path)
201        else:
202            base_path = None
203        return base_path
204
205
206class UnitTestArtifacts(Artifacts):
207    BUILDS_DIR_RE = re.compile(r'^builds/')
208
209    def _get_app_base_path(self):  # type: () -> Any
210        if self.artifact_info:
211            output_dir = self.BUILDS_DIR_RE.sub('output/', self.artifact_info['build_dir'])
212            return os.path.join(self.artifact_info['app_dir'], output_dir)
213        else:
214            return None
215
216    def _download_sdkconfig_file(self, base_path, job_id):  # type: (str, str) -> None
217        self.gitlab_inst.download_artifact(job_id, [os.path.join(base_path, 'sdkconfig')], self.dest_root_path)
218
219
220class IDFApp(App.BaseApp):
221    """
222    Implements common esp-idf application behavior.
223    idf applications should inherent from this class and overwrite method get_binary_path.
224    """
225
226    IDF_DOWNLOAD_CONFIG_FILE = 'download.config'
227    IDF_FLASH_ARGS_FILE = 'flasher_args.json'
228
229    def __init__(self, app_path, config_name=None, target=None, case_group=IDFCaseGroup, artifact_cls=Artifacts):  # type: ignore
230        super(IDFApp, self).__init__(app_path)
231        self.app_path = app_path  # type: (str)
232        self.config_name = config_name  # type: (str)
233        self.target = target  # type: (str)
234        self.idf_path = self.get_sdk_path()  # type: (str)
235        self.case_group = case_group
236        self.artifact_cls = artifact_cls
237        self.binary_path = self.get_binary_path()
238        self.elf_file = self._get_elf_file_path()
239        self._elf_file_sha256 = None  # type: (Optional[str])
240        assert os.path.exists(self.binary_path)
241        if self.IDF_DOWNLOAD_CONFIG_FILE not in os.listdir(self.binary_path):
242            if self.IDF_FLASH_ARGS_FILE not in os.listdir(self.binary_path):
243                msg = ('Neither {} nor {} exists. '
244                       "Try to run 'make print_flash_cmd | tail -n 1 > {}/{}' "
245                       "or 'idf.py build' "
246                       'for resolving the issue.'
247                       '').format(self.IDF_DOWNLOAD_CONFIG_FILE, self.IDF_FLASH_ARGS_FILE,
248                                  self.binary_path, self.IDF_DOWNLOAD_CONFIG_FILE)
249                raise AssertionError(msg)
250
251        # In order to keep backward compatibility, flash_files is unchanged.
252        # However, we now have a new attribute encrypt_files.
253        self.flash_files, self.encrypt_files, self.flash_settings = self._parse_flash_download_config()
254        self.partition_table = self._parse_partition_table()
255
256    def __str__(self):  # type: () -> str
257        parts = ['app<{}>'.format(self.app_path)]
258        if self.config_name:
259            parts.append('config<{}>'.format(self.config_name))
260        if self.target:
261            parts.append('target<{}>'.format(self.target))
262        return ' '.join(parts)
263
264    @classmethod
265    def get_sdk_path(cls):  # type: () -> str
266        idf_path = os.getenv('IDF_PATH')
267        assert idf_path
268        assert os.path.exists(idf_path)
269        return idf_path
270
271    def _get_sdkconfig_paths(self):  # type: () -> List[str]
272        """
273        returns list of possible paths where sdkconfig could be found
274
275        Note: could be overwritten by a derived class to provide other locations or order
276        """
277        return [os.path.join(self.binary_path, 'sdkconfig'), os.path.join(self.binary_path, '..', 'sdkconfig')]
278
279    def get_sdkconfig(self):  # type: () -> Dict
280        """
281        reads sdkconfig and returns a dictionary with all configured variables
282
283        :raise: AssertionError: if sdkconfig file does not exist in defined paths
284        """
285        d = {}
286        sdkconfig_file = None
287        for i in self._get_sdkconfig_paths():
288            if os.path.exists(i):
289                sdkconfig_file = i
290                break
291        assert sdkconfig_file is not None
292        with open(sdkconfig_file) as f:
293            for line in f:
294                configs = line.split('=')
295                if len(configs) == 2:
296                    d[configs[0]] = configs[1].rstrip()
297        return d
298
299    def get_sdkconfig_config_value(self, config_key):    # type: (str) -> Any
300        sdkconfig_dict = self.get_sdkconfig()
301        value = None
302        if (config_key in sdkconfig_dict):
303            value = sdkconfig_dict[config_key]
304        return value
305
306    @abstractmethod
307    def _try_get_binary_from_local_fs(self):  # type: () -> Optional[str]
308        pass
309
310    def get_binary_path(self):  # type: () -> str
311        path = self._try_get_binary_from_local_fs()
312        if path:
313            return path
314
315        artifacts = self.artifact_cls(self.idf_path,
316                                      self.case_group.get_artifact_index_file(),
317                                      self.app_path, self.config_name, self.target)
318        if isinstance(self, LoadableElfTestApp):
319            assert self.app_files
320            path = artifacts.download_artifact_files(self.app_files)
321        else:
322            path = artifacts.download_artifacts()
323
324        if path:
325            return os.path.join(self.idf_path, path)
326        else:
327            raise OSError('Failed to get binary for {}'.format(self))
328
329    def _get_elf_file_path(self):  # type: () -> str
330        ret = ''
331        file_names = os.listdir(self.binary_path)
332        for fn in file_names:
333            if os.path.splitext(fn)[1] == '.elf':
334                ret = os.path.join(self.binary_path, fn)
335        return ret
336
337    def _int_offs_abs_paths(self, files_list):  # type: (List[tuple[str, str]]) -> List[Tuple[int, str]]
338        return [(int(offs, 0),
339                 os.path.join(self.binary_path, file_path.strip()))
340                for (offs, file_path) in files_list]
341
342    def _parse_flash_download_config(self):  # type: () -> Tuple[List[tuple[int, str]], List[tuple[int, str]], Dict]
343        """
344        Parse flash download config from build metadata files
345
346        Sets self.flash_files, self.flash_settings
347
348        (Called from constructor)
349
350        Returns (flash_files, encrypt_files, flash_settings)
351        """
352
353        if self.IDF_FLASH_ARGS_FILE in os.listdir(self.binary_path):
354            # CMake version using build metadata file
355            path = os.path.join(self.binary_path, self.IDF_FLASH_ARGS_FILE)
356        else:
357            # GNU Make version uses download.config arguments file
358            path = os.path.join(self.binary_path, self.IDF_DOWNLOAD_CONFIG_FILE)
359
360        # If the JSON doesn't find the encrypted flag for our files, provide
361        # a default encrpytion flag: the macro
362        # CONFIG_SECURE_FLASH_ENCRYPTION_MODE_DEVELOPMENT
363        sdkconfig_dict = self.get_sdkconfig()
364        default_encryption = 'CONFIG_SECURE_FLASH_ENCRYPTION_MODE_DEVELOPMENT' in sdkconfig_dict
365
366        flash_files, encrypt_files, flash_settings, _ = parse_flash_settings(path, default_encryption)
367
368        # Flash setting "encrypt" only and only if all the files to flash
369        # must be encrypted. Else, this parameter should be False.
370        # All files must be encrypted is both file lists are the same
371        flash_settings['encrypt'] = sorted(flash_files) == sorted(encrypt_files)
372
373        return self._int_offs_abs_paths(flash_files), self._int_offs_abs_paths(encrypt_files), flash_settings
374
375    def _parse_partition_table(self):  # type: ignore
376        """
377        Parse partition table contents based on app binaries
378
379        Returns partition_table data
380
381        (Called from constructor)
382        """
383        partition_tool = os.path.join(self.idf_path,
384                                      'components',
385                                      'partition_table',
386                                      'gen_esp32part.py')
387        assert os.path.exists(partition_tool)
388
389        errors = []
390        # self.flash_files is sorted based on offset in order to have a consistent result with different versions of
391        # Python
392        for (_, path) in sorted(self.flash_files, key=lambda elem: elem[0]):
393            if 'partition' in os.path.split(path)[1]:
394                partition_file = os.path.join(self.binary_path, path)
395
396                process = subprocess.Popen([sys.executable, partition_tool, partition_file],
397                                           stdout=subprocess.PIPE, stderr=subprocess.PIPE)
398                (raw_data, raw_error) = process.communicate()
399                if isinstance(raw_error, bytes):
400                    raw_error = raw_error.decode()
401                if 'Traceback' in raw_error:
402                    # Some exception occurred. It is possible that we've tried the wrong binary file.
403                    errors.append((path, raw_error))
404                    continue
405
406                if isinstance(raw_data, bytes):
407                    raw_data = raw_data.decode()
408                break
409        else:
410            traceback_msg = os.linesep.join(['{} {}:{}{}'.format(partition_tool,
411                                                                 p,
412                                                                 os.linesep,
413                                                                 msg) for p, msg in errors])
414            raise ValueError('No partition table found for IDF binary path: {}{}{}'.format(self.binary_path,
415                                                                                           os.linesep,
416                                                                                           traceback_msg))
417
418        partition_table = dict()
419        for line in raw_data.splitlines():
420            if line[0] != '#':
421                try:
422                    _name, _type, _subtype, _offset, _size, _flags = line.split(',')
423                    if _size[-1] == 'K':
424                        _size = int(_size[:-1]) * 1024
425                    elif _size[-1] == 'M':
426                        _size = int(_size[:-1]) * 1024 * 1024
427                    else:
428                        _size = int(_size)
429                    _offset = int(_offset, 0)
430                except ValueError:
431                    continue
432                partition_table[_name] = {
433                    'type': _type,
434                    'subtype': _subtype,
435                    'offset': _offset,
436                    'size': _size,
437                    'flags': _flags
438                }
439
440        return partition_table
441
442    def get_elf_sha256(self):  # type: () -> Optional[str]
443        if self._elf_file_sha256:
444            return self._elf_file_sha256
445
446        sha256 = hashlib.sha256()
447        with open(self.elf_file, 'rb') as f:
448            sha256.update(f.read())
449        self._elf_file_sha256 = sha256.hexdigest()
450        return self._elf_file_sha256
451
452
453class Example(IDFApp):
454    def __init__(self, app_path, config_name='default', target='esp32', case_group=ExampleGroup, artifacts_cls=Artifacts):
455        # type: (str, str, str, Type[ExampleGroup], Type[Artifacts]) -> None
456        if not config_name:
457            config_name = 'default'
458        if not target:
459            target = 'esp32'
460        super(Example, self).__init__(app_path, config_name, target, case_group, artifacts_cls)
461
462    def _get_sdkconfig_paths(self):  # type: () -> List[str]
463        """
464        overrides the parent method to provide exact path of sdkconfig for example tests
465        """
466        return [os.path.join(self.binary_path, '..', 'sdkconfig')]
467
468    def _try_get_binary_from_local_fs(self):  # type: () -> Optional[str]
469        # build folder of example path
470        path = os.path.join(self.idf_path, self.app_path, 'build')
471        if os.path.exists(path):
472            return path
473
474        # Search for CI build folders.
475        # Path format: $IDF_PATH/build_examples/app_path_with_underscores/config/target
476        # (see tools/ci/build_examples.sh)
477        # For example: $IDF_PATH/build_examples/examples_get-started_blink/default/esp32
478        app_path_underscored = self.app_path.replace(os.path.sep, '_')
479        example_path = os.path.join(self.idf_path, self.case_group.LOCAL_BUILD_DIR)
480        for dirpath in os.listdir(example_path):
481            if os.path.basename(dirpath) == app_path_underscored:
482                path = os.path.join(example_path, dirpath, self.config_name, self.target, 'build')
483                if os.path.exists(path):
484                    return path
485                else:
486                    return None
487        return None
488
489
490class UT(IDFApp):
491    def __init__(self, app_path, config_name='default', target='esp32', case_group=UnitTestGroup, artifacts_cls=UnitTestArtifacts):
492        # type: (str, str, str, Type[UnitTestGroup], Type[UnitTestArtifacts]) -> None
493        if not config_name:
494            config_name = 'default'
495        if not target:
496            target = 'esp32'
497        super(UT, self).__init__(app_path, config_name, target, case_group, artifacts_cls)
498
499    def _try_get_binary_from_local_fs(self):  # type: () -> Optional[str]
500        path = os.path.join(self.idf_path, self.app_path, 'build')
501        if os.path.exists(path):
502            return path
503
504        # first try to get from build folder of unit-test-app
505        path = os.path.join(self.idf_path, 'tools', 'unit-test-app', 'build')
506        if os.path.exists(path):
507            # found, use bin in build path
508            return path
509
510        # ``build_unit_test.sh`` will copy binary to output folder
511        path = os.path.join(self.idf_path, 'tools', 'unit-test-app', 'output', self.target, self.config_name)
512        if os.path.exists(path):
513            return path
514
515        return None
516
517
518class TestApp(Example):
519    def __init__(self, app_path, config_name='default', target='esp32', case_group=TestAppsGroup, artifacts_cls=Artifacts):
520        # type: (str, str, str, Type[TestAppsGroup], Type[Artifacts]) -> None
521        super(TestApp, self).__init__(app_path, config_name, target, case_group, artifacts_cls)
522
523
524class ComponentUTApp(TestApp):
525    def __init__(self, app_path, config_name='default', target='esp32', case_group=ComponentUTGroup, artifacts_cls=Artifacts):
526        # type: (str, str, str, Type[ComponentUTGroup], Type[Artifacts]) -> None
527        super(ComponentUTApp, self).__init__(app_path, config_name, target, case_group, artifacts_cls)
528
529
530class LoadableElfTestApp(TestApp):
531    def __init__(self, app_path, app_files, config_name='default', target='esp32', case_group=TestAppsGroup, artifacts_cls=Artifacts):
532        # type: (str, List[str], str, str, Type[TestAppsGroup], Type[Artifacts]) -> None
533        # add arg `app_files` for loadable elf test_app.
534        # Such examples only build elf files, so it doesn't generate flasher_args.json.
535        # So we can't get app files from config file. Test case should pass it to application.
536        super(IDFApp, self).__init__(app_path)
537        self.app_path = app_path
538        self.app_files = app_files
539        self.config_name = config_name or 'default'
540        self.target = target or 'esp32'
541        self.idf_path = self.get_sdk_path()
542        self.case_group = case_group
543        self.artifact_cls = artifacts_cls
544        self.binary_path = self.get_binary_path()
545        self.elf_file = self._get_elf_file_path()
546        assert os.path.exists(self.binary_path)
547
548
549class SSC(IDFApp):
550    def get_binary_path(self):  # type: () -> str
551        # TODO: to implement SSC get binary path
552        return self.app_path
553
554
555class AT(IDFApp):
556    def get_binary_path(self):  # type: () -> str
557        # TODO: to implement AT get binary path
558        return self.app_path
559