1import os 2import re 3import socket 4import struct 5import sys 6import time 7from threading import Thread 8 9import ttfw_idf 10from tiny_test_fw import DUT 11 12msgid = -1 13 14 15def get_my_ip(): 16 s1 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 17 s1.connect(('8.8.8.8', 80)) 18 my_ip = s1.getsockname()[0] 19 s1.close() 20 return my_ip 21 22 23def mqqt_server_sketch(my_ip, port): 24 global msgid 25 print('Starting the server on {}'.format(my_ip)) 26 s = None 27 try: 28 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 29 s.settimeout(60) 30 s.bind((my_ip, port)) 31 s.listen(1) 32 q,addr = s.accept() 33 q.settimeout(30) 34 print('connection accepted') 35 except Exception: 36 print('Local server on {}:{} listening/accepting failure: {}' 37 'Possibly check permissions or firewall settings' 38 'to accept connections on this address'.format(my_ip, port, sys.exc_info()[0])) 39 raise 40 data = q.recv(1024) 41 # check if received initial empty message 42 print('received from client {}'.format(data)) 43 data = bytearray([0x20, 0x02, 0x00, 0x00]) 44 q.send(data) 45 # try to receive qos1 46 data = q.recv(1024) 47 msgid = struct.unpack('>H', data[15:17])[0] 48 print('received from client {}, msgid: {}'.format(data, msgid)) 49 data = bytearray([0x40, 0x02, data[15], data[16]]) 50 q.send(data) 51 time.sleep(5) 52 s.close() 53 print('server closed') 54 55 56@ttfw_idf.idf_example_test(env_tag='Example_EthKitV1') 57def test_examples_protocol_mqtt_qos1(env, extra_data): 58 global msgid 59 """ 60 steps: (QoS1: Happy flow) 61 1. start the broker broker (with correctly sending ACK) 62 2. DUT client connects to a broker and publishes qos1 message 63 3. Test evaluates that qos1 message is queued and removed from queued after ACK received 64 4. Test the broker received the same message id evaluated in step 3 65 """ 66 dut1 = env.get_dut('mqtt_tcp', 'examples/protocols/mqtt/tcp', dut_class=ttfw_idf.ESP32DUT) 67 # check and log bin size 68 binary_file = os.path.join(dut1.app.binary_path, 'mqtt_tcp.bin') 69 bin_size = os.path.getsize(binary_file) 70 ttfw_idf.log_performance('mqtt_tcp_bin_size', '{}KB'.format(bin_size // 1024)) 71 # 1. start mqtt broker sketch 72 host_ip = get_my_ip() 73 thread1 = Thread(target=mqqt_server_sketch, args=(host_ip,1883)) 74 thread1.start() 75 # 2. start the dut test and wait till client gets IP address 76 dut1.start_app() 77 # waiting for getting the IP address 78 try: 79 ip_address = dut1.expect(re.compile(r' eth ip: ([^,]+),'), timeout=30) 80 print('Connected to AP with IP: {}'.format(ip_address)) 81 except DUT.ExpectTimeout: 82 raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP') 83 84 print('writing to device: {}'.format('mqtt://' + host_ip + '\n')) 85 dut1.write('mqtt://' + host_ip + '\n') 86 thread1.join() 87 print('Message id received from server: {}'.format(msgid)) 88 # 3. check the message id was enqueued and then deleted 89 msgid_enqueued = dut1.expect(re.compile(r'OUTBOX: ENQUEUE msgid=([0-9]+)'), timeout=30) 90 msgid_deleted = dut1.expect(re.compile(r'OUTBOX: DELETED msgid=([0-9]+)'), timeout=30) 91 # 4. check the msgid of received data are the same as that of enqueued and deleted from outbox 92 if (msgid_enqueued[0] == str(msgid) and msgid_deleted[0] == str(msgid)): 93 print('PASS: Received correct msg id') 94 else: 95 print('Failure!') 96 raise ValueError('Mismatch of msgid: received: {}, enqueued {}, deleted {}'.format(msgid, msgid_enqueued, msgid_deleted)) 97 98 99if __name__ == '__main__': 100 test_examples_protocol_mqtt_qos1() 101