1# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http:#www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15""" Test Env, manages DUT, App and EnvConfig, interface for test cases to access these components """ 16import functools 17import os 18import threading 19import traceback 20 21import netifaces 22 23from . import EnvConfig 24 25 26def _synced(func): 27 @functools.wraps(func) 28 def decorator(self, *args, **kwargs): 29 with self.lock: 30 ret = func(self, *args, **kwargs) 31 return ret 32 33 decorator.__doc__ = func.__doc__ 34 return decorator 35 36 37class Env(object): 38 """ 39 test env, manages DUTs and env configs. 40 41 :keyword app: class for default application 42 :keyword dut: class for default DUT 43 :keyword env_tag: test env tag, used to select configs from env config file 44 :keyword env_config_file: test env config file path 45 :keyword test_name: test suite name, used when generate log folder name 46 """ 47 CURRENT_LOG_FOLDER = '' 48 49 def __init__(self, 50 app=None, 51 dut=None, 52 env_tag=None, 53 env_config_file=None, 54 test_suite_name=None, 55 **kwargs): 56 self.app_cls = app 57 self.default_dut_cls = dut 58 self.config = EnvConfig.Config(env_config_file, env_tag) 59 self.log_path = self.app_cls.get_log_folder(test_suite_name) 60 if not os.path.exists(self.log_path): 61 os.makedirs(self.log_path) 62 63 Env.CURRENT_LOG_FOLDER = self.log_path 64 65 self.allocated_duts = dict() 66 self.lock = threading.RLock() 67 68 @_synced 69 def get_dut(self, dut_name, app_path, dut_class=None, app_class=None, app_config_name=None, **dut_init_args): 70 """ 71 get_dut(dut_name, app_path, dut_class=None, app_class=None) 72 73 :param dut_name: user defined name for DUT 74 :param app_path: application path, app instance will use this path to process application info 75 :param dut_class: dut class, if not specified will use default dut class of env 76 :param app_class: app class, if not specified will use default app of env 77 :param app_config_name: app build config 78 :keyword dut_init_args: extra kwargs used when creating DUT instance 79 :return: dut instance 80 """ 81 if dut_name in self.allocated_duts: 82 dut = self.allocated_duts[dut_name]['dut'] 83 else: 84 if dut_class is None: 85 dut_class = self.default_dut_cls 86 if app_class is None: 87 app_class = self.app_cls 88 89 app_target = dut_class.TARGET 90 detected_target = None 91 92 try: 93 port = self.config.get_variable(dut_name) 94 if not app_target: 95 result, detected_target = dut_class.confirm_dut(port) 96 except ValueError: 97 # try to auto detect ports 98 allocated_ports = [self.allocated_duts[x]['port'] for x in self.allocated_duts] 99 available_ports = dut_class.list_available_ports() 100 for port in available_ports: 101 if port not in allocated_ports: 102 result, detected_target = dut_class.confirm_dut(port) 103 if result: 104 break 105 else: 106 port = None 107 108 if not app_target: 109 app_target = detected_target 110 if not app_target: 111 raise ValueError("DUT class doesn't specify the target, and autodetection failed") 112 app_inst = app_class(app_path, app_config_name, app_target) 113 114 if port: 115 try: 116 dut_config = self.get_variable(dut_name + '_port_config') 117 except ValueError: 118 dut_config = dict() 119 dut_config.update(dut_init_args) 120 dut = dut_class(dut_name, port, 121 os.path.join(self.log_path, dut_name + '.log'), 122 app_inst, 123 **dut_config) 124 self.allocated_duts[dut_name] = {'port': port, 'dut': dut} 125 else: 126 raise ValueError('Failed to get DUT') 127 return dut 128 129 @_synced 130 def close_dut(self, dut_name): 131 """ 132 close_dut(dut_name) 133 close one DUT by name if DUT name is valid (the name used by ``get_dut``). otherwise will do nothing. 134 135 :param dut_name: user defined name for DUT 136 :return: None 137 """ 138 try: 139 dut = self.allocated_duts.pop(dut_name)['dut'] 140 dut.close() 141 except KeyError: 142 pass 143 144 @_synced 145 def get_variable(self, variable_name): 146 """ 147 get_variable(variable_name) 148 get variable from config file. If failed then try to auto-detected it. 149 150 :param variable_name: name of the variable 151 :return: value of variable if successfully found. otherwise None. 152 """ 153 return self.config.get_variable(variable_name) 154 155 PROTO_MAP = { 156 'ipv4': netifaces.AF_INET, 157 'ipv6': netifaces.AF_INET6, 158 'mac': netifaces.AF_LINK, 159 } 160 161 @_synced 162 def get_pc_nic_info(self, nic_name='pc_nic', proto='ipv4'): 163 """ 164 get_pc_nic_info(nic_name="pc_nic") 165 try to get info of a specified NIC and protocol. 166 167 :param nic_name: pc nic name. allows passing variable name, nic name value. 168 :param proto: "ipv4", "ipv6" or "mac" 169 :return: a dict of nic info if successfully found. otherwise None. 170 nic info keys could be different for different protocols. 171 key "addr" is available for both mac, ipv4 and ipv6 pic info. 172 """ 173 interfaces = netifaces.interfaces() 174 if nic_name in interfaces: 175 # the name is in the interface list, we regard it as NIC name 176 if_addr = netifaces.ifaddresses(nic_name) 177 else: 178 # it's not in interface name list, we assume it's variable name 179 _nic_name = self.get_variable(nic_name) 180 if_addr = netifaces.ifaddresses(_nic_name) 181 182 return if_addr[self.PROTO_MAP[proto]][0] 183 184 @_synced 185 def close(self, dut_debug=False): 186 """ 187 close() 188 close all DUTs of the Env. 189 190 :param dut_debug: if dut_debug is True, then print all dut expect failures before close it 191 :return: exceptions during close DUT 192 """ 193 dut_close_errors = [] 194 for dut_name in self.allocated_duts: 195 dut = self.allocated_duts[dut_name]['dut'] 196 try: 197 if dut_debug: 198 dut.print_debug_info() 199 dut.close() 200 except Exception as e: 201 traceback.print_exc() 202 dut_close_errors.append(e) 203 self.allocated_duts = dict() 204 return dut_close_errors 205