1from __future__ import unicode_literals 2 3import os 4import threading 5import time 6 7import serial 8import ttfw_idf 9from tiny_test_fw import Utility 10 11 12class SerialThread(object): 13 ''' 14 Connect to serial port and fake responses just like from a real modem 15 ''' 16 17 # Dictionary for transforming received AT command to expected response 18 AT_FSM = {b'AT+CGMM': b'0G Dummy Model', 19 b'AT+CGSN': b'0123456789', 20 b'AT+CIMI': b'ESP', 21 b'AT+COPS?': b'+COPS: 0,0,"ESP Network"', 22 b'AT+CSQ': b'+CSQ: 4,0', 23 b'AT+CBC': b'+CBC: 0,50', 24 b'ATD*99***1#': b'CONNECT', 25 } 26 27 def run(self, log_path, exit_event): 28 with serial.Serial(self.port, 115200) as ser, open(log_path, 'w') as f: 29 buff = b'' 30 while not exit_event.is_set(): 31 time.sleep(0.1) 32 buff += ser.read(ser.in_waiting) 33 if not buff.endswith(b'\r'): 34 continue # read more because the complete command wasn't yet received 35 cmd_list = buff.split(b'\r') 36 buff = b'' 37 for cmd in cmd_list: 38 if len(cmd) == 0: 39 continue 40 snd = self.AT_FSM.get(cmd, b'') 41 if snd != b'': 42 snd += b'\n' 43 snd += b'OK\n' 44 f.write('Received: {}\n'.format(repr(cmd.decode()))) 45 f.write('Sent: {}\n'.format(repr(snd.decode()))) 46 ser.write(snd) 47 48 def __init__(self, port, log_path): 49 self.port = port 50 self.exit_event = threading.Event() 51 self.t = threading.Thread(target=self.run, args=(log_path, self.exit_event,)) 52 self.t.start() 53 54 def __enter__(self): 55 return self 56 57 def __exit__(self, type, value, traceback): 58 self.exit_event.set() 59 self.t.join(60) 60 if self.t.is_alive(): 61 Utility.console_log('The serial thread is still alive', 'O') 62 63 64@ttfw_idf.idf_example_test(env_tag='Example_PPP') 65def test_examples_pppos_client(env, extra_data): 66 67 rel_project_path = 'examples/protocols/pppos_client' 68 dut = env.get_dut('pppos_client', rel_project_path) 69 project_path = os.path.join(dut.app.get_sdk_path(), rel_project_path) 70 71 modem_port = '/dev/ttyUSB{}'.format(0 if dut.port.endswith('1') else 1) 72 73 with SerialThread(modem_port, os.path.join(project_path, 'serial.log')): 74 dut.start_app() 75 76 dut.expect_all('pppos_example: Module: 0G Dummy Model', 77 'pppos_example: Operator: "ESP Network"', 78 'pppos_example: IMEI: 0123456789', 79 'pppos_example: IMSI: ESP', 80 'pppos_example: rssi: 4, ber: 0', 81 'pppos_example: Battery voltage: 0 mV', 82 'pppos_example: Modem PPP Started', 83 timeout=60) 84 85 cmd = ('pppd {} 115200 10.0.0.1:10.0.0.2 logfile {} local noauth debug nocrtscts nodetach +ipv6' 86 ''.format(modem_port, os.path.join(project_path, 'ppp.log'))) 87 with ttfw_idf.CustomProcess(cmd, '/dev/null'): # Nothing is printed here 88 dut.expect_all('pppos_example: Modem Connect to PPP Server', 89 'pppos_example: IP : 10.0.0.2', 90 'pppos_example: Netmask : 255.255.255.255', 91 'pppos_example: Gateway : 10.0.0.1', 92 'pppos_example: Name Server1: 0.0.0.0', 93 'pppos_example: Name Server2: 0.0.0.0', 94 'pppos_example: GOT ip event!!!', 95 'pppos_example: MQTT other event id: 7', 96 # There are no fake DNS server and MQTT server set up so the example fails at this point 97 'esp-tls: couldn\'t get hostname', 98 'MQTT_CLIENT: Error transport connect', 99 'pppos_example: MQTT_EVENT_ERROR', 100 'pppos_example: MQTT_EVENT_DISCONNECTED') 101 102 103if __name__ == '__main__': 104 test_examples_pppos_client() 105