1# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http:#www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15""" 16DUT provides 3 major groups of features: 17 18* DUT port feature, provide basic open/close/read/write features 19* DUT tools, provide extra methods to control the device, like download and start app 20* DUT expect method, provide features for users to check DUT outputs 21 22The current design of DUT have 3 classes for one DUT: BaseDUT, DUTPort, DUTTool. 23 24* BaseDUT class: 25 * defines methods DUT port and DUT tool need to overwrite 26 * provide the expect methods and some other methods based on DUTPort 27* DUTPort class: 28 * inherent from BaseDUT class 29 * implements the port features by overwriting port methods defined in BaseDUT 30* DUTTool class: 31 * inherent from one of the DUTPort class 32 * implements the tools features by overwriting tool methods defined in BaseDUT 33 * could add some new methods provided by the tool 34 35This module implements the BaseDUT class and one of the port class SerialDUT. 36User should implement their DUTTool classes. 37If they using different port then need to implement their DUTPort class as well. 38""" 39 40from __future__ import print_function 41 42import copy 43import functools 44import re 45import sys 46import threading 47import time 48 49# python2 and python3 queue package name is different 50try: 51 import Queue as _queue 52except ImportError: 53 import queue as _queue # type: ignore 54 55try: 56 from typing import Callable, List 57except ImportError: 58 # Only used for type annotations 59 pass 60 61import serial 62from serial.tools import list_ports 63 64from . import Utility 65 66 67class ExpectTimeout(ValueError): 68 """ timeout for expect method """ 69 pass 70 71 72class UnsupportedExpectItem(ValueError): 73 """ expect item not supported by the expect method """ 74 pass 75 76 77def _expect_lock(func): 78 @functools.wraps(func) 79 def handler(self, *args, **kwargs): 80 with self.expect_lock: 81 ret = func(self, *args, **kwargs) 82 return ret 83 return handler 84 85 86def _decode_data(data): 87 """ for python3, if the data is bytes, then decode it to string """ 88 if isinstance(data, bytes): 89 # convert bytes to string. This is a bit of a hack, we know that we want to log this 90 # later so encode to the stdout encoding with backslash escapes for anything non-encodable 91 try: 92 return data.decode(sys.stdout.encoding, 'backslashreplace') 93 except UnicodeDecodeError: # Python <3.5 doesn't support backslashreplace 94 return data.decode(sys.stdout.encoding, 'replace') 95 return data 96 97 98def _pattern_to_string(pattern): 99 try: 100 ret = 'RegEx: ' + pattern.pattern 101 except AttributeError: 102 ret = pattern 103 return ret 104 105 106class _DataCache(_queue.Queue): 107 """ 108 Data cache based on Queue. Allow users to process data cache based on bytes instead of Queue." 109 """ 110 111 def __init__(self, maxsize=0): 112 _queue.Queue.__init__(self, maxsize=maxsize) 113 self.data_cache = str() 114 115 def _move_from_queue_to_cache(self): 116 """ 117 move all of the available data in the queue to cache 118 119 :return: True if moved any item from queue to data cache, else False 120 """ 121 ret = False 122 while True: 123 try: 124 self.data_cache += _decode_data(self.get(0)) 125 ret = True 126 except _queue.Empty: 127 break 128 return ret 129 130 def get_data(self, timeout=0.0): 131 """ 132 get a copy of data from cache. 133 134 :param timeout: timeout for waiting new queue item 135 :return: copy of data cache 136 """ 137 # make sure timeout is non-negative 138 if timeout < 0: 139 timeout = 0 140 141 ret = self._move_from_queue_to_cache() 142 143 if not ret: 144 # we only wait for new data if we can't provide a new data_cache 145 try: 146 data = self.get(timeout=timeout) 147 self.data_cache += _decode_data(data) 148 except _queue.Empty: 149 # don't do anything when on update for cache 150 pass 151 return copy.deepcopy(self.data_cache) 152 153 def flush(self, index=0xFFFFFFFF): 154 """ 155 flush data from cache. 156 157 :param index: if < 0 then don't do flush, otherwise flush data before index 158 :return: None 159 """ 160 # first add data in queue to cache 161 self.get_data() 162 163 if index > 0: 164 self.data_cache = self.data_cache[index:] 165 166 167class _LogThread(threading.Thread, _queue.Queue): 168 """ 169 We found some SD card on Raspberry Pi could have very bad performance. 170 It could take seconds to save small amount of data. 171 If the DUT receives data and save it as log, then it stops receiving data until log is saved. 172 This could lead to expect timeout. 173 As an workaround to this issue, ``BaseDUT`` class will create a thread to save logs. 174 Then data will be passed to ``expect`` as soon as received. 175 """ 176 def __init__(self): 177 threading.Thread.__init__(self, name='LogThread') 178 _queue.Queue.__init__(self, maxsize=0) 179 self.setDaemon(True) 180 self.flush_lock = threading.Lock() 181 182 def save_log(self, filename, data): 183 """ 184 :param filename: log file name 185 :param data: log data. Must be ``bytes``. 186 """ 187 self.put({'filename': filename, 'data': data}) 188 189 def flush_data(self): 190 with self.flush_lock: 191 data_cache = dict() 192 while True: 193 # move all data from queue to data cache 194 try: 195 log = self.get_nowait() 196 try: 197 data_cache[log['filename']] += log['data'] 198 except KeyError: 199 data_cache[log['filename']] = log['data'] 200 except _queue.Empty: 201 break 202 # flush data 203 for filename in data_cache: 204 with open(filename, 'ab+') as f: 205 f.write(data_cache[filename]) 206 207 def run(self): 208 while True: 209 time.sleep(1) 210 self.flush_data() 211 212 213class RecvThread(threading.Thread): 214 215 CHECK_FUNCTIONS = [] # type: List[Callable] 216 """ DUT subclass can define a few check functions to process received data. """ 217 218 def __init__(self, read, dut): 219 super(RecvThread, self).__init__() 220 self.exit_event = threading.Event() 221 self.setDaemon(True) 222 self.read = read 223 self.dut = dut 224 self.data_cache = dut.data_cache 225 self.recorded_data = dut.recorded_data 226 self.record_data_lock = dut.record_data_lock 227 self._line_cache = str() 228 229 def _line_completion(self, data): 230 """ 231 Usually check functions requires to check for one complete line. 232 This method will do line completion for the first line, and strip incomplete last line. 233 """ 234 ret = self._line_cache 235 decoded_data = _decode_data(data) 236 237 # cache incomplete line to later process 238 lines = decoded_data.splitlines(True) 239 last_line = lines[-1] 240 241 if last_line[-1] != '\n': 242 if len(lines) == 1: 243 # only one line and the line is not finished, then append this to cache 244 self._line_cache += lines[-1] 245 ret = str() 246 else: 247 # more than one line and not finished, replace line cache 248 self._line_cache = lines[-1] 249 ret += ''.join(lines[:-1]) 250 else: 251 # line finishes, flush cache 252 self._line_cache = str() 253 ret += decoded_data 254 return ret 255 256 def run(self): 257 while not self.exit_event.isSet(): 258 raw_data = self.read(1000) 259 if raw_data: 260 # we need to do line completion before call check functions 261 # need to call check functions first 262 # otherwise check functions could be called after cases finished 263 comp_data = self._line_completion(raw_data) 264 for check_function in self.CHECK_FUNCTIONS: 265 check_function(self, comp_data) 266 267 with self.record_data_lock: 268 self.data_cache.put(raw_data) 269 for capture_id in self.recorded_data: 270 self.recorded_data[capture_id].put(raw_data) 271 272 def exit(self): 273 self.exit_event.set() 274 self.join() 275 276 277class BaseDUT(object): 278 """ 279 :param name: application defined name for port 280 :param port: comport name, used to create DUT port 281 :param log_file: log file name 282 :param app: test app instance 283 :param kwargs: extra args for DUT to create ports 284 """ 285 286 DEFAULT_EXPECT_TIMEOUT = 10 287 MAX_EXPECT_FAILURES_TO_SAVED = 10 288 RECV_THREAD_CLS = RecvThread 289 TARGET = None 290 """ DUT subclass can specify RECV_THREAD_CLS to do add some extra stuff when receive data. 291 For example, DUT can implement exception detect & analysis logic in receive thread subclass. """ 292 LOG_THREAD = _LogThread() 293 LOG_THREAD.start() 294 295 def __init__(self, name, port, log_file, app, **kwargs): 296 297 self.expect_lock = threading.Lock() 298 self.name = name 299 self.port = port 300 self.log_file = log_file 301 self.app = app 302 self.data_cache = _DataCache() 303 # the main process of recorded data are done in receive thread 304 # but receive thread could be closed in DUT lifetime (tool methods) 305 # so we keep it in BaseDUT, as their life cycle are same 306 self.recorded_data = dict() 307 self.record_data_lock = threading.RLock() 308 self.receive_thread = None 309 self.expect_failures = [] 310 self._port_open() 311 self.start_receive() 312 313 def __str__(self): 314 return 'DUT({}: {})'.format(self.name, str(self.port)) 315 316 def _save_expect_failure(self, pattern, data, start_time): 317 """ 318 Save expect failure. If the test fails, then it will print the expect failures. 319 In some cases, user will handle expect exceptions. 320 The expect failures could be false alarm, and test case might generate a lot of such failures. 321 Therefore, we don't print the failure immediately and limit the max size of failure list. 322 """ 323 self.expect_failures.insert(0, {'pattern': pattern, 'data': data, 324 'start': start_time, 'end': time.time()}) 325 self.expect_failures = self.expect_failures[:self.MAX_EXPECT_FAILURES_TO_SAVED] 326 327 def _save_dut_log(self, data): 328 """ 329 Save DUT log into file using another thread. 330 This is a workaround for some devices takes long time for file system operations. 331 332 See descriptions in ``_LogThread`` for details. 333 """ 334 self.LOG_THREAD.save_log(self.log_file, data) 335 336 # define for methods need to be overwritten by Port 337 @classmethod 338 def list_available_ports(cls): 339 """ 340 list all available ports. 341 342 subclass (port) must overwrite this method. 343 344 :return: list of available comports 345 """ 346 pass 347 348 def _port_open(self): 349 """ 350 open the port. 351 352 subclass (port) must overwrite this method. 353 354 :return: None 355 """ 356 pass 357 358 def _port_read(self, size=1): 359 """ 360 read form port. This method should not blocking for long time, otherwise receive thread can not exit. 361 362 subclass (port) must overwrite this method. 363 364 :param size: max size to read. 365 :return: read data. 366 """ 367 pass 368 369 def _port_write(self, data): 370 """ 371 write to port. 372 373 subclass (port) must overwrite this method. 374 375 :param data: data to write 376 :return: None 377 """ 378 pass 379 380 def _port_close(self): 381 """ 382 close port. 383 384 subclass (port) must overwrite this method. 385 386 :return: None 387 """ 388 pass 389 390 # methods that need to be overwritten by Tool 391 @classmethod 392 def confirm_dut(cls, port, **kwargs): 393 """ 394 confirm if it's a DUT, usually used by auto detecting DUT in by Env config. 395 396 subclass (tool) must overwrite this method. 397 398 :param port: comport 399 :return: tuple of result (bool), and target (str) 400 """ 401 pass 402 403 def start_app(self): 404 """ 405 usually after we got DUT, we need to do some extra works to let App start. 406 For example, we need to reset->download->reset to let IDF application start on DUT. 407 408 subclass (tool) must overwrite this method. 409 410 :return: None 411 """ 412 pass 413 414 # methods that features raw port methods 415 def start_receive(self): 416 """ 417 Start thread to receive data. 418 419 :return: None 420 """ 421 self.receive_thread = self.RECV_THREAD_CLS(self._port_read, self) 422 self.receive_thread.start() 423 424 def stop_receive(self): 425 """ 426 stop the receiving thread for the port 427 :return: None 428 """ 429 if self.receive_thread: 430 self.receive_thread.exit() 431 self.LOG_THREAD.flush_data() 432 self.receive_thread = None 433 434 def close(self): 435 """ 436 permanently close the port 437 """ 438 self.stop_receive() 439 self._port_close() 440 441 @staticmethod 442 def u_to_bytearray(data): 443 """ 444 if data is not bytearray then it tries to convert it 445 446 :param data: data which needs to be checked and maybe transformed 447 """ 448 if isinstance(data, type(u'')): 449 try: 450 data = data.encode('utf-8') 451 except UnicodeEncodeError as e: 452 print(u'Cannot encode {} of type {}'.format(data, type(data))) 453 raise e 454 return data 455 456 def write(self, data, eol='\r\n', flush=True): 457 """ 458 :param data: data 459 :param eol: end of line pattern. 460 :param flush: if need to flush received data cache before write data. 461 usually we need to flush data before write, 462 make sure processing outputs generated by wrote. 463 :return: None 464 """ 465 # do flush before write 466 if flush: 467 self.data_cache.flush() 468 # do write if cache 469 if data is not None: 470 self._port_write(self.u_to_bytearray(data) + self.u_to_bytearray(eol) if eol else self.u_to_bytearray(data)) 471 472 @_expect_lock 473 def read(self, size=0xFFFFFFFF): 474 """ 475 read(size=0xFFFFFFFF) 476 read raw data. NOT suggested to use this method. 477 Only use it if expect method doesn't meet your requirement. 478 479 :param size: read size. default read all data 480 :return: read data 481 """ 482 data = self.data_cache.get_data(0)[:size] 483 self.data_cache.flush(size) 484 return data 485 486 def start_capture_raw_data(self, capture_id='default'): 487 """ 488 Sometime application want to get DUT raw data and use ``expect`` method at the same time. 489 Capture methods provides a way to get raw data without affecting ``expect`` or ``read`` method. 490 491 If you call ``start_capture_raw_data`` with same capture id again, it will restart capture on this ID. 492 493 :param capture_id: ID of capture. You can use different IDs to do different captures at the same time. 494 """ 495 with self.record_data_lock: 496 try: 497 # if start capture on existed ID, we do flush data and restart capture 498 self.recorded_data[capture_id].flush() 499 except KeyError: 500 # otherwise, create new data cache 501 self.recorded_data[capture_id] = _DataCache() 502 503 def stop_capture_raw_data(self, capture_id='default'): 504 """ 505 Stop capture and get raw data. 506 This method should be used after ``start_capture_raw_data`` on the same capture ID. 507 508 :param capture_id: ID of capture. 509 :return: captured raw data between start capture and stop capture. 510 """ 511 with self.record_data_lock: 512 try: 513 ret = self.recorded_data[capture_id].get_data() 514 self.recorded_data.pop(capture_id) 515 except KeyError as e: 516 e.message = 'capture_id does not exist. ' \ 517 'You should call start_capture_raw_data with same ID ' \ 518 'before calling stop_capture_raw_data' 519 raise e 520 return ret 521 522 # expect related methods 523 524 @staticmethod 525 def _expect_str(data, pattern): 526 """ 527 protected method. check if string is matched in data cache. 528 529 :param data: data to process 530 :param pattern: string 531 :return: pattern if match succeed otherwise None 532 """ 533 index = data.find(pattern) 534 if index != -1: 535 ret = pattern 536 index += len(pattern) 537 else: 538 ret = None 539 return ret, index 540 541 @staticmethod 542 def _expect_re(data, pattern): 543 """ 544 protected method. check if re pattern is matched in data cache 545 546 :param data: data to process 547 :param pattern: compiled RegEx pattern 548 :return: match groups if match succeed otherwise None 549 """ 550 ret = None 551 if isinstance(pattern.pattern, bytes): 552 pattern = re.compile(_decode_data(pattern.pattern)) 553 match = pattern.search(data) 554 if match: 555 ret = tuple(x for x in match.groups()) 556 index = match.end() 557 else: 558 index = -1 559 return ret, index 560 561 EXPECT_METHOD = [ 562 [type(re.compile('')), '_expect_re'], 563 [type(b''), '_expect_str'], # Python 2 & 3 hook to work without 'from builtins import str' from future 564 [type(u''), '_expect_str'], 565 ] 566 567 def _get_expect_method(self, pattern): 568 """ 569 protected method. get expect method according to pattern type. 570 571 :param pattern: expect pattern, string or compiled RegEx 572 :return: ``_expect_str`` or ``_expect_re`` 573 """ 574 for expect_method in self.EXPECT_METHOD: 575 if isinstance(pattern, expect_method[0]): 576 method = expect_method[1] 577 break 578 else: 579 raise UnsupportedExpectItem() 580 return self.__getattribute__(method) 581 582 @_expect_lock 583 def expect(self, pattern, timeout=DEFAULT_EXPECT_TIMEOUT, full_stdout=False): 584 """ 585 expect(pattern, timeout=DEFAULT_EXPECT_TIMEOUT) 586 expect received data on DUT match the pattern. will raise exception when expect timeout. 587 588 :raise ExpectTimeout: failed to find the pattern before timeout 589 :raise UnsupportedExpectItem: pattern is not string or compiled RegEx 590 591 :param pattern: string or compiled RegEx(string pattern) 592 :param timeout: timeout for expect 593 :param full_stdout: return full stdout until meet expect string/pattern or just matched string 594 :return: string if pattern is string; matched groups if pattern is RegEx 595 """ 596 method = self._get_expect_method(pattern) 597 stdout = '' 598 599 # non-blocking get data for first time 600 data = self.data_cache.get_data(0) 601 start_time = time.time() 602 while True: 603 ret, index = method(data, pattern) 604 if ret is not None: 605 stdout = data[:index] 606 self.data_cache.flush(index) 607 break 608 time_remaining = start_time + timeout - time.time() 609 if time_remaining < 0: 610 break 611 # wait for new data from cache 612 data = self.data_cache.get_data(time_remaining) 613 614 if ret is None: 615 pattern = _pattern_to_string(pattern) 616 self._save_expect_failure(pattern, data, start_time) 617 raise ExpectTimeout(self.name + ': ' + pattern) 618 return stdout if full_stdout else ret 619 620 def _expect_multi(self, expect_all, expect_item_list, timeout): 621 """ 622 protected method. internal logical for expect multi. 623 624 :param expect_all: True or False, expect all items in the list or any in the list 625 :param expect_item_list: expect item list 626 :param timeout: timeout 627 :return: None 628 """ 629 def process_expected_item(item_raw): 630 # convert item raw data to standard dict 631 item = { 632 'pattern': item_raw[0] if isinstance(item_raw, tuple) else item_raw, 633 'method': self._get_expect_method(item_raw[0] if isinstance(item_raw, tuple) 634 else item_raw), 635 'callback': item_raw[1] if isinstance(item_raw, tuple) else None, 636 'index': -1, 637 'ret': None, 638 } 639 return item 640 641 expect_items = [process_expected_item(x) for x in expect_item_list] 642 643 # non-blocking get data for first time 644 data = self.data_cache.get_data(0) 645 646 start_time = time.time() 647 matched_expect_items = list() 648 while True: 649 for expect_item in expect_items: 650 if expect_item not in matched_expect_items: 651 # exclude those already matched 652 expect_item['ret'], expect_item['index'] = \ 653 expect_item['method'](data, expect_item['pattern']) 654 if expect_item['ret'] is not None: 655 # match succeed for one item 656 matched_expect_items.append(expect_item) 657 658 # if expect all, then all items need to be matched, 659 # else only one item need to matched 660 if expect_all: 661 match_succeed = len(matched_expect_items) == len(expect_items) 662 else: 663 match_succeed = True if matched_expect_items else False 664 665 time_remaining = start_time + timeout - time.time() 666 if time_remaining < 0 or match_succeed: 667 break 668 else: 669 data = self.data_cache.get_data(time_remaining) 670 671 if match_succeed: 672 # sort matched items according to order of appearance in the input data, 673 # so that the callbacks are invoked in correct order 674 matched_expect_items = sorted(matched_expect_items, key=lambda it: it['index']) 675 # invoke callbacks and flush matched data cache 676 slice_index = -1 677 for expect_item in matched_expect_items: 678 # trigger callback 679 if expect_item['callback']: 680 expect_item['callback'](expect_item['ret']) 681 slice_index = max(slice_index, expect_item['index']) 682 # flush already matched data 683 self.data_cache.flush(slice_index) 684 else: 685 pattern = str([_pattern_to_string(x['pattern']) for x in expect_items]) 686 self._save_expect_failure(pattern, data, start_time) 687 raise ExpectTimeout(self.name + ': ' + pattern) 688 689 @_expect_lock 690 def expect_any(self, *expect_items, **timeout): 691 """ 692 expect_any(*expect_items, timeout=DEFAULT_TIMEOUT) 693 expect any of the patterns. 694 will call callback (if provided) if pattern match succeed and then return. 695 will pass match result to the callback. 696 697 :raise ExpectTimeout: failed to match any one of the expect items before timeout 698 :raise UnsupportedExpectItem: pattern in expect_item is not string or compiled RegEx 699 700 :arg expect_items: one or more expect items. 701 string, compiled RegEx pattern or (string or RegEx(string pattern), callback) 702 :keyword timeout: timeout for expect 703 :return: None 704 """ 705 # to be compatible with python2 706 # in python3 we can write f(self, *expect_items, timeout=DEFAULT_TIMEOUT) 707 if 'timeout' not in timeout: 708 timeout['timeout'] = self.DEFAULT_EXPECT_TIMEOUT 709 return self._expect_multi(False, expect_items, **timeout) 710 711 @_expect_lock 712 def expect_all(self, *expect_items, **timeout): 713 """ 714 expect_all(*expect_items, timeout=DEFAULT_TIMEOUT) 715 expect all of the patterns. 716 will call callback (if provided) if all pattern match succeed and then return. 717 will pass match result to the callback. 718 719 :raise ExpectTimeout: failed to match all of the expect items before timeout 720 :raise UnsupportedExpectItem: pattern in expect_item is not string or compiled RegEx 721 722 :arg expect_items: one or more expect items. 723 string, compiled RegEx pattern or (string or RegEx(string pattern), callback) 724 :keyword timeout: timeout for expect 725 :return: None 726 """ 727 # to be compatible with python2 728 # in python3 we can write f(self, *expect_items, timeout=DEFAULT_TIMEOUT) 729 if 'timeout' not in timeout: 730 timeout['timeout'] = self.DEFAULT_EXPECT_TIMEOUT 731 return self._expect_multi(True, expect_items, **timeout) 732 733 @staticmethod 734 def _format_ts(ts): 735 return '{}:{}'.format(time.strftime('%m-%d %H:%M:%S', time.localtime(ts)), str(ts % 1)[2:5]) 736 737 def print_debug_info(self): 738 """ 739 Print debug info of current DUT. Currently we will print debug info for expect failures. 740 """ 741 Utility.console_log('DUT debug info for DUT: {}:'.format(self.name), color='orange') 742 743 for failure in self.expect_failures: 744 Utility.console_log(u'\t[pattern]: {}\r\n\t[data]: {}\r\n\t[time]: {} - {}\r\n' 745 .format(failure['pattern'], failure['data'], 746 self._format_ts(failure['start']), self._format_ts(failure['end'])), 747 color='orange') 748 749 750class SerialDUT(BaseDUT): 751 """ serial with logging received data feature """ 752 753 DEFAULT_UART_CONFIG = { 754 'baudrate': 115200, 755 'bytesize': serial.EIGHTBITS, 756 'parity': serial.PARITY_NONE, 757 'stopbits': serial.STOPBITS_ONE, 758 'timeout': 0.05, 759 'xonxoff': False, 760 'rtscts': False, 761 } 762 763 def __init__(self, name, port, log_file, app, **kwargs): 764 self.port_inst = None 765 self.serial_configs = self.DEFAULT_UART_CONFIG.copy() 766 for uart_config_name in self.serial_configs.keys(): 767 if uart_config_name in kwargs: 768 self.serial_configs[uart_config_name] = kwargs[uart_config_name] 769 super(SerialDUT, self).__init__(name, port, log_file, app, **kwargs) 770 771 def _format_data(self, data): 772 """ 773 format data for logging. do decode and add timestamp. 774 775 :param data: raw data from read 776 :return: formatted data (str) 777 """ 778 timestamp = '[{}]'.format(self._format_ts(time.time())) 779 formatted_data = timestamp.encode() + b'\r\n' + data + b'\r\n' 780 return formatted_data 781 782 def _port_open(self): 783 self.port_inst = serial.serial_for_url(self.port, **self.serial_configs) 784 785 def _port_close(self): 786 self.port_inst.close() 787 788 def _port_read(self, size=1): 789 data = self.port_inst.read(size) 790 if data: 791 self._save_dut_log(self._format_data(data)) 792 return data 793 794 def _port_write(self, data): 795 if isinstance(data, str): 796 data = data.encode() 797 self.port_inst.write(data) 798 799 @classmethod 800 def list_available_ports(cls): 801 return [x.device for x in list_ports.comports()] 802