1# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http:#www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15""" IDF Test Applications """ 16import hashlib 17import json 18import os 19import re 20import subprocess 21import sys 22from abc import abstractmethod 23 24from tiny_test_fw import App 25 26from .IDFAssignTest import ComponentUTGroup, ExampleGroup, IDFCaseGroup, TestAppsGroup, UnitTestGroup 27 28try: 29 import gitlab_api 30except ImportError: 31 gitlab_api = None 32 33try: 34 from typing import Any, Dict, List, Optional, Tuple, Type # noqa: F401 35except ImportError: 36 pass 37 38 39def parse_encrypted_flag(args, offs, binary): # type: (Dict, str, str) -> Any 40 # Find partition entries (e.g. the entries with an offset and a file) 41 for _, entry in args.items(): 42 # If the current entry is a partition, we have to check whether it is 43 # the one we are looking for or not 44 try: 45 if (entry['offset'], entry['file']) == (offs, binary): 46 return entry['encrypted'] == 'true' 47 except (TypeError, KeyError): 48 # TypeError occurs if the entry is a list, which is possible in JSON 49 # data structure. 50 # KeyError occurs if the entry doesn't have "encrypted" field. 51 continue 52 53 # The entry was not found, return None. The caller will have to check 54 # CONFIG_SECURE_FLASH_ENCRYPTION_MODE_DEVELOPMENT macro 55 return None 56 57 58def parse_flash_settings(path, default_encryption=False): # type: (str, bool) -> Tuple[List[Tuple[str, str]], List[Tuple[str, str]], Dict, Any] 59 file_name = os.path.basename(path) 60 61 # For compatibility reasons, this list contains all the files to be 62 # flashed 63 flash_files = [] 64 # The following list only contains the files that need encryption 65 encrypt_files = [] 66 67 if file_name == 'flasher_args.json': 68 # CMake version using build metadata file 69 with open(path, 'r') as f: 70 args = json.load(f) 71 72 for (offs, binary) in args['flash_files'].items(): 73 if offs: 74 flash_files.append((offs, binary)) 75 encrypted = parse_encrypted_flag(args, offs, binary) 76 77 # default_encryption should be taken into account if and only if 78 # encrypted flag is not provided in the JSON file. 79 if (encrypted is None and default_encryption) or encrypted: 80 encrypt_files.append((offs, binary)) 81 82 flash_settings = args['flash_settings'] 83 app_name = os.path.splitext(args['app']['file'])[0] 84 else: 85 # GNU Make version uses download.config arguments file 86 with open(path, 'r') as f: 87 args = f.readlines()[-1].split(' ') 88 flash_settings = {} 89 for idx in range(0, len(args), 2): # process arguments in pairs 90 if args[idx].startswith('--'): 91 # strip the -- from the command line argument 92 flash_settings[args[idx][2:]] = args[idx + 1] 93 else: 94 # offs, filename 95 flash_files.append((args[idx], args[idx + 1])) 96 # Parameter default_encryption tells us if the files need encryption 97 if default_encryption: 98 encrypt_files = flash_files 99 # we can only guess app name in download.config. 100 for p in flash_files: 101 if not os.path.dirname(p[1]) and 'partition' not in p[1]: 102 # app bin usually in the same dir with download.config and it's not partition table 103 app_name = os.path.splitext(p[1])[0] 104 break 105 else: 106 app_name = None 107 return flash_files, encrypt_files, flash_settings, app_name 108 109 110class Artifacts(object): 111 def __init__(self, dest_root_path, artifact_index_file, app_path, config_name, target): 112 # type: (str, str, str, str, str) -> None 113 assert gitlab_api 114 # at least one of app_path or config_name is not None. otherwise we can't match artifact 115 assert app_path or config_name 116 assert os.path.exists(artifact_index_file) 117 self.gitlab_inst = gitlab_api.Gitlab(os.getenv('CI_PROJECT_ID')) 118 self.dest_root_path = dest_root_path 119 with open(artifact_index_file, 'r') as f: 120 artifact_index = json.load(f) 121 self.artifact_info = self._find_artifact(artifact_index, app_path, config_name, target) 122 123 @staticmethod 124 def _find_artifact(artifact_index, app_path, config_name, target): # type: ignore 125 for artifact_info in artifact_index: 126 match_result = True 127 if app_path: 128 # We use endswith here to avoid issue like: 129 # examples_protocols_mqtt_ws but return a examples_protocols_mqtt_wss failure 130 match_result = artifact_info['app_dir'].endswith(app_path) 131 if config_name: 132 match_result = match_result and config_name == artifact_info['config'] 133 if target: 134 match_result = match_result and target == artifact_info['target'] 135 if match_result: 136 ret = artifact_info 137 break 138 else: 139 ret = None 140 return ret 141 142 def _get_app_base_path(self): # type: () -> Any 143 if self.artifact_info: 144 return os.path.join(self.artifact_info['work_dir'], self.artifact_info['build_dir']) 145 else: 146 return None 147 148 def _get_flash_arg_file(self, base_path, job_id): # type: (str, str) -> str 149 if self.artifact_info['build_system'] == 'cmake': 150 flash_arg_file = os.path.join(base_path, 'flasher_args.json') 151 else: 152 flash_arg_file = os.path.join(base_path, 'download.config') 153 154 self.gitlab_inst.download_artifact(job_id, [flash_arg_file], self.dest_root_path) 155 return flash_arg_file 156 157 def _download_binary_files(self, base_path, job_id, flash_arg_file): # type: (str, str, str) -> None 158 # Let's ignore the second value returned (encrypt_files) as these 159 # files also appear in the first list 160 flash_files, _, _, app_name = parse_flash_settings(os.path.join(self.dest_root_path, flash_arg_file)) 161 artifact_files = [os.path.join(base_path, p[1]) for p in flash_files] 162 artifact_files.append(os.path.join(base_path, app_name + '.elf')) 163 164 bootloader_path = os.path.join(base_path, 'bootloader', 'bootloader.bin') 165 if bootloader_path not in artifact_files: 166 artifact_files.append(bootloader_path) 167 168 self.gitlab_inst.download_artifact(job_id, artifact_files, self.dest_root_path) 169 170 def _download_sdkconfig_file(self, base_path, job_id): # type: (str, str) -> None 171 self.gitlab_inst.download_artifact(job_id, [os.path.join(os.path.dirname(base_path), 'sdkconfig')], 172 self.dest_root_path) 173 174 def download_artifacts(self): # type: () -> Any 175 if not self.artifact_info: 176 return None 177 base_path = self._get_app_base_path() 178 job_id = self.artifact_info['ci_job_id'] 179 # 1. download flash args file 180 flash_arg_file = self._get_flash_arg_file(base_path, job_id) 181 182 # 2. download all binary files 183 self._download_binary_files(base_path, job_id, flash_arg_file) 184 185 # 3. download sdkconfig file 186 self._download_sdkconfig_file(base_path, job_id) 187 return base_path 188 189 def download_artifact_files(self, file_names): # type: (List[str]) -> Any 190 if self.artifact_info: 191 base_path = os.path.join(self.artifact_info['work_dir'], self.artifact_info['build_dir']) 192 job_id = self.artifact_info['ci_job_id'] 193 194 # download all binary files 195 artifact_files = [os.path.join(base_path, fn) for fn in file_names] 196 self.gitlab_inst.download_artifact(job_id, artifact_files, self.dest_root_path) 197 198 # download sdkconfig file 199 self.gitlab_inst.download_artifact(job_id, [os.path.join(os.path.dirname(base_path), 'sdkconfig')], 200 self.dest_root_path) 201 else: 202 base_path = None 203 return base_path 204 205 206class UnitTestArtifacts(Artifacts): 207 BUILDS_DIR_RE = re.compile(r'^builds/') 208 209 def _get_app_base_path(self): # type: () -> Any 210 if self.artifact_info: 211 output_dir = self.BUILDS_DIR_RE.sub('output/', self.artifact_info['build_dir']) 212 return os.path.join(self.artifact_info['app_dir'], output_dir) 213 else: 214 return None 215 216 def _download_sdkconfig_file(self, base_path, job_id): # type: (str, str) -> None 217 self.gitlab_inst.download_artifact(job_id, [os.path.join(base_path, 'sdkconfig')], self.dest_root_path) 218 219 220class IDFApp(App.BaseApp): 221 """ 222 Implements common esp-idf application behavior. 223 idf applications should inherent from this class and overwrite method get_binary_path. 224 """ 225 226 IDF_DOWNLOAD_CONFIG_FILE = 'download.config' 227 IDF_FLASH_ARGS_FILE = 'flasher_args.json' 228 229 def __init__(self, app_path, config_name=None, target=None, case_group=IDFCaseGroup, artifact_cls=Artifacts): # type: ignore 230 super(IDFApp, self).__init__(app_path) 231 self.app_path = app_path # type: (str) 232 self.config_name = config_name # type: (str) 233 self.target = target # type: (str) 234 self.idf_path = self.get_sdk_path() # type: (str) 235 self.case_group = case_group 236 self.artifact_cls = artifact_cls 237 self.binary_path = self.get_binary_path() 238 self.elf_file = self._get_elf_file_path() 239 self._elf_file_sha256 = None # type: (Optional[str]) 240 assert os.path.exists(self.binary_path) 241 if self.IDF_DOWNLOAD_CONFIG_FILE not in os.listdir(self.binary_path): 242 if self.IDF_FLASH_ARGS_FILE not in os.listdir(self.binary_path): 243 msg = ('Neither {} nor {} exists. ' 244 "Try to run 'make print_flash_cmd | tail -n 1 > {}/{}' " 245 "or 'idf.py build' " 246 'for resolving the issue.' 247 '').format(self.IDF_DOWNLOAD_CONFIG_FILE, self.IDF_FLASH_ARGS_FILE, 248 self.binary_path, self.IDF_DOWNLOAD_CONFIG_FILE) 249 raise AssertionError(msg) 250 251 # In order to keep backward compatibility, flash_files is unchanged. 252 # However, we now have a new attribute encrypt_files. 253 self.flash_files, self.encrypt_files, self.flash_settings = self._parse_flash_download_config() 254 self.partition_table = self._parse_partition_table() 255 256 def __str__(self): # type: () -> str 257 parts = ['app<{}>'.format(self.app_path)] 258 if self.config_name: 259 parts.append('config<{}>'.format(self.config_name)) 260 if self.target: 261 parts.append('target<{}>'.format(self.target)) 262 return ' '.join(parts) 263 264 @classmethod 265 def get_sdk_path(cls): # type: () -> str 266 idf_path = os.getenv('IDF_PATH') 267 assert idf_path 268 assert os.path.exists(idf_path) 269 return idf_path 270 271 def _get_sdkconfig_paths(self): # type: () -> List[str] 272 """ 273 returns list of possible paths where sdkconfig could be found 274 275 Note: could be overwritten by a derived class to provide other locations or order 276 """ 277 return [os.path.join(self.binary_path, 'sdkconfig'), os.path.join(self.binary_path, '..', 'sdkconfig')] 278 279 def get_sdkconfig(self): # type: () -> Dict 280 """ 281 reads sdkconfig and returns a dictionary with all configured variables 282 283 :raise: AssertionError: if sdkconfig file does not exist in defined paths 284 """ 285 d = {} 286 sdkconfig_file = None 287 for i in self._get_sdkconfig_paths(): 288 if os.path.exists(i): 289 sdkconfig_file = i 290 break 291 assert sdkconfig_file is not None 292 with open(sdkconfig_file) as f: 293 for line in f: 294 configs = line.split('=') 295 if len(configs) == 2: 296 d[configs[0]] = configs[1].rstrip() 297 return d 298 299 def get_sdkconfig_config_value(self, config_key): # type: (str) -> Any 300 sdkconfig_dict = self.get_sdkconfig() 301 value = None 302 if (config_key in sdkconfig_dict): 303 value = sdkconfig_dict[config_key] 304 return value 305 306 @abstractmethod 307 def _try_get_binary_from_local_fs(self): # type: () -> Optional[str] 308 pass 309 310 def get_binary_path(self): # type: () -> str 311 path = self._try_get_binary_from_local_fs() 312 if path: 313 return path 314 315 artifacts = self.artifact_cls(self.idf_path, 316 self.case_group.get_artifact_index_file(), 317 self.app_path, self.config_name, self.target) 318 if isinstance(self, LoadableElfTestApp): 319 assert self.app_files 320 path = artifacts.download_artifact_files(self.app_files) 321 else: 322 path = artifacts.download_artifacts() 323 324 if path: 325 return os.path.join(self.idf_path, path) 326 else: 327 raise OSError('Failed to get binary for {}'.format(self)) 328 329 def _get_elf_file_path(self): # type: () -> str 330 ret = '' 331 file_names = os.listdir(self.binary_path) 332 for fn in file_names: 333 if os.path.splitext(fn)[1] == '.elf': 334 ret = os.path.join(self.binary_path, fn) 335 return ret 336 337 def _int_offs_abs_paths(self, files_list): # type: (List[tuple[str, str]]) -> List[Tuple[int, str]] 338 return [(int(offs, 0), 339 os.path.join(self.binary_path, file_path.strip())) 340 for (offs, file_path) in files_list] 341 342 def _parse_flash_download_config(self): # type: () -> Tuple[List[tuple[int, str]], List[tuple[int, str]], Dict] 343 """ 344 Parse flash download config from build metadata files 345 346 Sets self.flash_files, self.flash_settings 347 348 (Called from constructor) 349 350 Returns (flash_files, encrypt_files, flash_settings) 351 """ 352 353 if self.IDF_FLASH_ARGS_FILE in os.listdir(self.binary_path): 354 # CMake version using build metadata file 355 path = os.path.join(self.binary_path, self.IDF_FLASH_ARGS_FILE) 356 else: 357 # GNU Make version uses download.config arguments file 358 path = os.path.join(self.binary_path, self.IDF_DOWNLOAD_CONFIG_FILE) 359 360 # If the JSON doesn't find the encrypted flag for our files, provide 361 # a default encrpytion flag: the macro 362 # CONFIG_SECURE_FLASH_ENCRYPTION_MODE_DEVELOPMENT 363 sdkconfig_dict = self.get_sdkconfig() 364 default_encryption = 'CONFIG_SECURE_FLASH_ENCRYPTION_MODE_DEVELOPMENT' in sdkconfig_dict 365 366 flash_files, encrypt_files, flash_settings, _ = parse_flash_settings(path, default_encryption) 367 368 # Flash setting "encrypt" only and only if all the files to flash 369 # must be encrypted. Else, this parameter should be False. 370 # All files must be encrypted is both file lists are the same 371 flash_settings['encrypt'] = sorted(flash_files) == sorted(encrypt_files) 372 373 return self._int_offs_abs_paths(flash_files), self._int_offs_abs_paths(encrypt_files), flash_settings 374 375 def _parse_partition_table(self): # type: ignore 376 """ 377 Parse partition table contents based on app binaries 378 379 Returns partition_table data 380 381 (Called from constructor) 382 """ 383 partition_tool = os.path.join(self.idf_path, 384 'components', 385 'partition_table', 386 'gen_esp32part.py') 387 assert os.path.exists(partition_tool) 388 389 errors = [] 390 # self.flash_files is sorted based on offset in order to have a consistent result with different versions of 391 # Python 392 for (_, path) in sorted(self.flash_files, key=lambda elem: elem[0]): 393 if 'partition' in os.path.split(path)[1]: 394 partition_file = os.path.join(self.binary_path, path) 395 396 process = subprocess.Popen([sys.executable, partition_tool, partition_file], 397 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 398 (raw_data, raw_error) = process.communicate() 399 if isinstance(raw_error, bytes): 400 raw_error = raw_error.decode() 401 if 'Traceback' in raw_error: 402 # Some exception occurred. It is possible that we've tried the wrong binary file. 403 errors.append((path, raw_error)) 404 continue 405 406 if isinstance(raw_data, bytes): 407 raw_data = raw_data.decode() 408 break 409 else: 410 traceback_msg = os.linesep.join(['{} {}:{}{}'.format(partition_tool, 411 p, 412 os.linesep, 413 msg) for p, msg in errors]) 414 raise ValueError('No partition table found for IDF binary path: {}{}{}'.format(self.binary_path, 415 os.linesep, 416 traceback_msg)) 417 418 partition_table = dict() 419 for line in raw_data.splitlines(): 420 if line[0] != '#': 421 try: 422 _name, _type, _subtype, _offset, _size, _flags = line.split(',') 423 if _size[-1] == 'K': 424 _size = int(_size[:-1]) * 1024 425 elif _size[-1] == 'M': 426 _size = int(_size[:-1]) * 1024 * 1024 427 else: 428 _size = int(_size) 429 _offset = int(_offset, 0) 430 except ValueError: 431 continue 432 partition_table[_name] = { 433 'type': _type, 434 'subtype': _subtype, 435 'offset': _offset, 436 'size': _size, 437 'flags': _flags 438 } 439 440 return partition_table 441 442 def get_elf_sha256(self): # type: () -> Optional[str] 443 if self._elf_file_sha256: 444 return self._elf_file_sha256 445 446 sha256 = hashlib.sha256() 447 with open(self.elf_file, 'rb') as f: 448 sha256.update(f.read()) 449 self._elf_file_sha256 = sha256.hexdigest() 450 return self._elf_file_sha256 451 452 453class Example(IDFApp): 454 def __init__(self, app_path, config_name='default', target='esp32', case_group=ExampleGroup, artifacts_cls=Artifacts): 455 # type: (str, str, str, Type[ExampleGroup], Type[Artifacts]) -> None 456 if not config_name: 457 config_name = 'default' 458 if not target: 459 target = 'esp32' 460 super(Example, self).__init__(app_path, config_name, target, case_group, artifacts_cls) 461 462 def _get_sdkconfig_paths(self): # type: () -> List[str] 463 """ 464 overrides the parent method to provide exact path of sdkconfig for example tests 465 """ 466 return [os.path.join(self.binary_path, '..', 'sdkconfig')] 467 468 def _try_get_binary_from_local_fs(self): # type: () -> Optional[str] 469 # build folder of example path 470 path = os.path.join(self.idf_path, self.app_path, 'build') 471 if os.path.exists(path): 472 return path 473 474 # Search for CI build folders. 475 # Path format: $IDF_PATH/build_examples/app_path_with_underscores/config/target 476 # (see tools/ci/build_examples.sh) 477 # For example: $IDF_PATH/build_examples/examples_get-started_blink/default/esp32 478 app_path_underscored = self.app_path.replace(os.path.sep, '_') 479 example_path = os.path.join(self.idf_path, self.case_group.LOCAL_BUILD_DIR) 480 for dirpath in os.listdir(example_path): 481 if os.path.basename(dirpath) == app_path_underscored: 482 path = os.path.join(example_path, dirpath, self.config_name, self.target, 'build') 483 if os.path.exists(path): 484 return path 485 else: 486 return None 487 return None 488 489 490class UT(IDFApp): 491 def __init__(self, app_path, config_name='default', target='esp32', case_group=UnitTestGroup, artifacts_cls=UnitTestArtifacts): 492 # type: (str, str, str, Type[UnitTestGroup], Type[UnitTestArtifacts]) -> None 493 if not config_name: 494 config_name = 'default' 495 if not target: 496 target = 'esp32' 497 super(UT, self).__init__(app_path, config_name, target, case_group, artifacts_cls) 498 499 def _try_get_binary_from_local_fs(self): # type: () -> Optional[str] 500 path = os.path.join(self.idf_path, self.app_path, 'build') 501 if os.path.exists(path): 502 return path 503 504 # first try to get from build folder of unit-test-app 505 path = os.path.join(self.idf_path, 'tools', 'unit-test-app', 'build') 506 if os.path.exists(path): 507 # found, use bin in build path 508 return path 509 510 # ``build_unit_test.sh`` will copy binary to output folder 511 path = os.path.join(self.idf_path, 'tools', 'unit-test-app', 'output', self.target, self.config_name) 512 if os.path.exists(path): 513 return path 514 515 return None 516 517 518class TestApp(Example): 519 def __init__(self, app_path, config_name='default', target='esp32', case_group=TestAppsGroup, artifacts_cls=Artifacts): 520 # type: (str, str, str, Type[TestAppsGroup], Type[Artifacts]) -> None 521 super(TestApp, self).__init__(app_path, config_name, target, case_group, artifacts_cls) 522 523 524class ComponentUTApp(TestApp): 525 def __init__(self, app_path, config_name='default', target='esp32', case_group=ComponentUTGroup, artifacts_cls=Artifacts): 526 # type: (str, str, str, Type[ComponentUTGroup], Type[Artifacts]) -> None 527 super(ComponentUTApp, self).__init__(app_path, config_name, target, case_group, artifacts_cls) 528 529 530class LoadableElfTestApp(TestApp): 531 def __init__(self, app_path, app_files, config_name='default', target='esp32', case_group=TestAppsGroup, artifacts_cls=Artifacts): 532 # type: (str, List[str], str, str, Type[TestAppsGroup], Type[Artifacts]) -> None 533 # add arg `app_files` for loadable elf test_app. 534 # Such examples only build elf files, so it doesn't generate flasher_args.json. 535 # So we can't get app files from config file. Test case should pass it to application. 536 super(IDFApp, self).__init__(app_path) 537 self.app_path = app_path 538 self.app_files = app_files 539 self.config_name = config_name or 'default' 540 self.target = target or 'esp32' 541 self.idf_path = self.get_sdk_path() 542 self.case_group = case_group 543 self.artifact_cls = artifacts_cls 544 self.binary_path = self.get_binary_path() 545 self.elf_file = self._get_elf_file_path() 546 assert os.path.exists(self.binary_path) 547 548 549class SSC(IDFApp): 550 def get_binary_path(self): # type: () -> str 551 # TODO: to implement SSC get binary path 552 return self.app_path 553 554 555class AT(IDFApp): 556 def get_binary_path(self): # type: () -> str 557 # TODO: to implement AT get binary path 558 return self.app_path 559