1# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http:#www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15""" Test Env, manages DUT, App and EnvConfig, interface for test cases to access these components """
16import functools
17import os
18import threading
19import traceback
20
21import netifaces
22
23from . import EnvConfig
24
25
26def _synced(func):
27    @functools.wraps(func)
28    def decorator(self, *args, **kwargs):
29        with self.lock:
30            ret = func(self, *args, **kwargs)
31        return ret
32
33    decorator.__doc__ = func.__doc__
34    return decorator
35
36
37class Env(object):
38    """
39    test env, manages DUTs and env configs.
40
41    :keyword app: class for default application
42    :keyword dut: class for default DUT
43    :keyword env_tag: test env tag, used to select configs from env config file
44    :keyword env_config_file: test env config file path
45    :keyword test_name: test suite name, used when generate log folder name
46    """
47    CURRENT_LOG_FOLDER = ''
48
49    def __init__(self,
50                 app=None,
51                 dut=None,
52                 env_tag=None,
53                 env_config_file=None,
54                 test_suite_name=None,
55                 **kwargs):
56        self.app_cls = app
57        self.default_dut_cls = dut
58        self.config = EnvConfig.Config(env_config_file, env_tag)
59        self.log_path = self.app_cls.get_log_folder(test_suite_name)
60        if not os.path.exists(self.log_path):
61            os.makedirs(self.log_path)
62
63        Env.CURRENT_LOG_FOLDER = self.log_path
64
65        self.allocated_duts = dict()
66        self.lock = threading.RLock()
67
68    @_synced
69    def get_dut(self, dut_name, app_path, dut_class=None, app_class=None, app_config_name=None, **dut_init_args):
70        """
71        get_dut(dut_name, app_path, dut_class=None, app_class=None)
72
73        :param dut_name: user defined name for DUT
74        :param app_path: application path, app instance will use this path to process application info
75        :param dut_class: dut class, if not specified will use default dut class of env
76        :param app_class: app class, if not specified will use default app of env
77        :param app_config_name: app build config
78        :keyword dut_init_args: extra kwargs used when creating DUT instance
79        :return: dut instance
80        """
81        if dut_name in self.allocated_duts:
82            dut = self.allocated_duts[dut_name]['dut']
83        else:
84            if dut_class is None:
85                dut_class = self.default_dut_cls
86            if app_class is None:
87                app_class = self.app_cls
88
89            app_target = dut_class.TARGET
90            detected_target = None
91
92            try:
93                port = self.config.get_variable(dut_name)
94                if not app_target:
95                    result, detected_target = dut_class.confirm_dut(port)
96            except ValueError:
97                # try to auto detect ports
98                allocated_ports = [self.allocated_duts[x]['port'] for x in self.allocated_duts]
99                available_ports = dut_class.list_available_ports()
100                for port in available_ports:
101                    if port not in allocated_ports:
102                        result, detected_target = dut_class.confirm_dut(port)
103                        if result:
104                            break
105                else:
106                    port = None
107
108            if not app_target:
109                app_target = detected_target
110            if not app_target:
111                raise ValueError("DUT class doesn't specify the target, and autodetection failed")
112            app_inst = app_class(app_path, app_config_name, app_target)
113
114            if port:
115                try:
116                    dut_config = self.get_variable(dut_name + '_port_config')
117                except ValueError:
118                    dut_config = dict()
119                dut_config.update(dut_init_args)
120                dut = dut_class(dut_name, port,
121                                os.path.join(self.log_path, dut_name + '.log'),
122                                app_inst,
123                                **dut_config)
124                self.allocated_duts[dut_name] = {'port': port, 'dut': dut}
125            else:
126                raise ValueError('Failed to get DUT')
127        return dut
128
129    @_synced
130    def close_dut(self, dut_name):
131        """
132        close_dut(dut_name)
133        close one DUT by name if DUT name is valid (the name used by ``get_dut``). otherwise will do nothing.
134
135        :param dut_name: user defined name for DUT
136        :return: None
137        """
138        try:
139            dut = self.allocated_duts.pop(dut_name)['dut']
140            dut.close()
141        except KeyError:
142            pass
143
144    @_synced
145    def get_variable(self, variable_name):
146        """
147        get_variable(variable_name)
148        get variable from config file. If failed then try to auto-detected it.
149
150        :param variable_name: name of the variable
151        :return: value of variable if successfully found. otherwise None.
152        """
153        return self.config.get_variable(variable_name)
154
155    PROTO_MAP = {
156        'ipv4': netifaces.AF_INET,
157        'ipv6': netifaces.AF_INET6,
158        'mac': netifaces.AF_LINK,
159    }
160
161    @_synced
162    def get_pc_nic_info(self, nic_name='pc_nic', proto='ipv4'):
163        """
164        get_pc_nic_info(nic_name="pc_nic")
165        try to get info of a specified NIC and protocol.
166
167        :param nic_name: pc nic name. allows passing variable name, nic name value.
168        :param proto: "ipv4", "ipv6" or "mac"
169        :return: a dict of nic info if successfully found. otherwise None.
170                 nic info keys could be different for different protocols.
171                 key "addr" is available for both mac, ipv4 and ipv6 pic info.
172        """
173        interfaces = netifaces.interfaces()
174        if nic_name in interfaces:
175            # the name is in the interface list, we regard it as NIC name
176            if_addr = netifaces.ifaddresses(nic_name)
177        else:
178            # it's not in interface name list, we assume it's variable name
179            _nic_name = self.get_variable(nic_name)
180            if_addr = netifaces.ifaddresses(_nic_name)
181
182        return if_addr[self.PROTO_MAP[proto]][0]
183
184    @_synced
185    def close(self, dut_debug=False):
186        """
187        close()
188        close all DUTs of the Env.
189
190        :param dut_debug: if dut_debug is True, then print all dut expect failures before close it
191        :return: exceptions during close DUT
192        """
193        dut_close_errors = []
194        for dut_name in self.allocated_duts:
195            dut = self.allocated_duts[dut_name]['dut']
196            try:
197                if dut_debug:
198                    dut.print_debug_info()
199                dut.close()
200            except Exception as e:
201                traceback.print_exc()
202                dut_close_errors.append(e)
203        self.allocated_duts = dict()
204        return dut_close_errors
205