1""" 2The test script will only run the one test on micropython. It is not a 3framework that can be used for other tests. This has a reduced code footprint. 4entrypoint API is almost the same as the framework. 5 6 7Requirements 8Python >= 3.10 9 10pillow library 11 12add the following to the CI prior to the test running 13 14python3 -m pip install pillow 15 16Example command line to run the test. I suggest doing this from the root of the 17binding directory. It is just a simple location to do it from. 18 19Paths that are passed in MUST be relative to the current working directory. 20python3 lib/lv_bindings/lvgl/tests/micropy_test/__init__.py --artifact-path=lib/lv_bindings/lvgl/tests/micropy_test/artifacts --mpy-path=ports/unix/build-standard/micropython 21 22""" 23import os 24import time 25import signal 26import argparse 27import binascii 28import unittest 29import threading 30import subprocess 31 32from PIL import Image as Image 33 34 35DEBUG = 0 36 37debug_log = None 38 39saved_test_data = [] 40 41 42def format_error_data(data): 43 output = '' 44 for line in data.split('\n'): 45 output += f'\033[31;1m{line}\033[0m\n' 46 return output 47 48 49def log(*args): 50 args = ' '.join(repr(arg) for arg in args) 51 debug_log.write(args + '\n') 52 53 if DEBUG: 54 sys.stdout.write('\033[31;1m' + args + '\033[0m\n') 55 sys.stdout.flush() 56 57 58BASE_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__))) 59TEST_PATH = os.path.join(BASE_PATH, 'micropy.py') 60IMG_PATH = os.path.join(BASE_PATH, '../ref_imgs/binding.png') 61 62CTRL_C = b'\x03' # 2 times to exit any running code 63CTRL_D = b'\x04' # exit paste mode committing and running what has been pasted 64CTRL_E = b'\x05' # enter paste mode 65 66PASTE_PROMPT = b'===' 67REPL_PROMPT = b'>>>' 68 69os.environ['MICROPYINSPECT'] = '1' 70 71 72class TestData: 73 74 def __init__(self): 75 self.watchdog_timer = time.time() 76 self.result = None 77 self.error_data = '' 78 self.event = threading.Event() 79 80 81class MicroPython_Test(unittest.TestCase): 82 # these are here simply to make an IDE happy. Their values get dynamically 83 # set when the class gets constructed 84 process: subprocess.Popen = None 85 exit_event: threading.Event = None 86 87 @classmethod 88 def send(cls, cmd): 89 if cls.process is None: 90 return 91 92 log('<---', cmd) 93 cls.process.stdin.write(cmd) 94 cls.process.stdin.flush() 95 96 @classmethod 97 def read_until(cls, marker): 98 micropy_data = b'' 99 error_data = b'' 100 101 log('MARKER', marker) 102 103 logged = False 104 105 while ( 106 not micropy_data.endswith(marker) and 107 not cls.exit_event.is_set() 108 ): 109 try: 110 char = cls.process.stdout.read(1) 111 except: # NOQA 112 break 113 114 if char: 115 micropy_data += char 116 logged = False 117 else: 118 logged = True 119 log('--->', micropy_data) 120 121 if b'\nERROR END\n' in micropy_data: 122 error_data = micropy_data.split(b'\nERROR START\n')[-1].split(b'\nERROR END\n')[0] 123 micropy_data = b'' 124 try: 125 log('---> ERROR: ', error_data.decode('utf-8')) 126 except UnicodeDecodeError: 127 log('---> ERROR: ', error_data) 128 129 logged = True 130 break 131 132 if not logged: 133 log('--->', micropy_data) 134 135 if b'\nERROR END\n' in micropy_data: 136 error_data = micropy_data.split(b'\nERROR START\n')[-1].split(b'\nERROR END\n')[0] 137 micropy_data = b'' 138 try: 139 log('---> ERROR: ', error_data.decode('utf-8')) 140 except UnicodeDecodeError: 141 log('---> ERROR: ', error_data) 142 143 if cls.exit_event.is_set(): 144 log('--EXIT EVENT SET') 145 146 saved_test_data.append((micropy_data, error_data)) 147 148 return micropy_data.replace(marker, b''), error_data 149 150 @classmethod 151 def setUpClass(cls): 152 os.chdir(os.path.dirname(__file__)) 153 cls.exit_event = threading.Event() 154 155 log(f'--SETTING UP') 156 cls.process = subprocess.Popen( 157 ['bash'], 158 stdout=subprocess.PIPE, 159 stderr=subprocess.PIPE, 160 stdin=subprocess.PIPE, 161 env=os.environ, 162 shell=True, 163 preexec_fn=os.setsid 164 ) 165 log(f'--RUNNING MICROPYTHON ({MICROPYTHON_PATH})') 166 167 cls.send(b'cd ' + os.path.dirname(__file__).encode('utf-8') + b'\n') 168 cls.send(MICROPYTHON_PATH.encode('utf-8') + b'\n') 169 _, error_data = cls.read_until(b'>>>') 170 171 if error_data: 172 raise RuntimeError(error_data) 173 174 log('--MICROPYTHON STARTED') 175 176 @classmethod 177 def tearDownClass(cls): 178 log('--TEARDOWN STARTED') 179 180 if cls.process is not None: 181 cls.send(CTRL_C) 182 cls.send(CTRL_C) 183 cls.send(CTRL_D) 184 185 if not cls.process.stdin.closed: 186 cls.process.stdin.close() 187 188 os.killpg(os.getpgid(cls.process.pid), signal.SIGTERM) 189 190 cls.process.wait() 191 192 if not cls.process.stdout.closed: 193 cls.process.stdout.close() 194 195 if not cls.process.stderr.closed: 196 cls.process.stderr.close() 197 198 cls.process = None 199 200 log(f'--TEARDOWN FINISHED') 201 202 def test_1_image_compare(self): 203 image = Image.open(IMG_PATH) 204 res_img = image.convert('RGB') 205 image.close() 206 res_data = list(res_img.getdata()) 207 res_img.close() 208 209 with open(TEST_PATH, 'rb') as f: 210 test_code = f.read() 211 212 log(f'--RUNNING TEST ({TEST_PATH})') 213 214 test_code = test_code.strip() 215 216 if self.__class__.process is None: 217 self.fail('MicroPython failure.') 218 219 self.send(CTRL_E) 220 221 _, error_data = self.read_until(PASTE_PROMPT) 222 if error_data: 223 self.fail(error_data) 224 225 test_code = test_code.split(b'\r\n') 226 227 for i, line in enumerate(test_code): 228 self.send(line + b'\n') 229 _, error_data = self.read_until(b'\n') 230 231 if error_data: 232 self.fail(error_data) 233 234 time.sleep(0.002) 235 236 # self.read_until(b'# end\n') 237 238 def _do(td: TestData): 239 self.send(CTRL_D) 240 td.error_data = b'' 241 td.watchdog_timer = time.time() 242 243 td.result = [] 244 245 try: 246 _, td.error_data = self.read_until(b'FRAME START\n') 247 td.watchdog_timer = time.time() 248 249 lne, td.error_data = self.read_until(b'\n') 250 251 while ( 252 b'FRAME END' not in lne and 253 not td.error_data and 254 not self.__class__.exit_event.is_set() 255 ): 256 td.watchdog_timer = time.time() 257 td.result.append(lne) 258 lne, td.error_data = self.read_until(b'\n') 259 260 if td.error_data: 261 return 262 263 if self.__class__.exit_event.is_set(): 264 return 265 266 except: # NOQA 267 import traceback 268 269 traceback.print_exc() 270 271 td.error_data = traceback.format_exc() 272 return 273 274 td.event.set() 275 276 test_data = TestData() 277 278 t = threading.Thread( 279 target=_do, 280 args=(test_data,) 281 ) 282 t.daemon = True 283 test_data.watchdog_timer = time.time() 284 285 t.start() 286 287 while ( 288 (time.time() - test_data.watchdog_timer) * 1000 <= 20000 and 289 not test_data.event.is_set() 290 ): 291 test_data.event.wait(0.05) 292 293 if not test_data.event.is_set(): 294 self.__class__.exit_event.set() 295 # self.read_until(REPL_PROMPT) 296 297 self.send(CTRL_C) 298 self.send(CTRL_C) 299 300 width = 800 301 height = 480 302 303 if test_data.error_data: 304 self.fail(test_data.error_data) 305 306 try: 307 frame = bytearray( 308 b''.join(binascii.unhexlify(lne) for lne in test_data.result) 309 ) 310 311 # I don't exactly know why the byte order is backwards but it is 312 frame = bytes(bytearray([ 313 item for j in range(0, len(frame), 3) 314 for item in [frame[j + 2], frame[j + 1], frame[j]] 315 ])) 316 317 image = Image.new('RGB', (width, height)) 318 image.frombytes(frame) 319 img = image.convert('RGB') 320 image.close() 321 322 byte_data = list(img.getdata()) 323 img.save(os.path.join(ARTIFACT_PATH, f'frame.png'), 'PNG') 324 img.close() 325 326 with open(os.path.join(ARTIFACT_PATH, f'frame.bin'), 'wb') as f: 327 # have to flatten the data and remove the alpha 328 # from the PIL image it is formatted as 329 # [(r, g, b), (r, g, b)] 330 f.write(bytes(bytearray([ 331 item for sublist in byte_data 332 for item in sublist 333 ]))) 334 335 except binascii.Error: 336 error = [] 337 for line, err in saved_test_data: 338 if err: 339 try: 340 error.append(err.decode('utf-8')) 341 except UnicodeDecodeError: 342 error.append(str(err)) 343 344 error = '\n'.join(error) 345 if error: 346 self.fail(format_error_data(error)) 347 else: 348 self.fail(b'\n'.join(test_data.result)) 349 350 except: # NOQA 351 import traceback 352 353 self.fail(traceback.format_exc()) 354 355 self.assertEqual(res_data, byte_data, 'Frames do not match') 356 357 358cwd = os.path.abspath(os.getcwd()) 359 360ARTIFACT_PATH = os.path.join(cwd, 'artifacts') 361MICROPYTHON_PATH = os.path.join(cwd, 'micropython') 362 363 364if __name__ == '__main__': 365 import sys 366 367 arg_parser = argparse.ArgumentParser() 368 arg_parser.add_argument( 369 '--artifact-path', 370 dest='artifact_path', 371 help='path to save artifacts to', 372 action='store' 373 ) 374 arg_parser.add_argument( 375 '--mpy-path', 376 dest='mpy_path', 377 help='path to micropython', 378 action='store' 379 ) 380 arg_parser.add_argument( 381 '--debug', 382 dest='debug', 383 help='debug output', 384 action='store_true' 385 ) 386 387 args = arg_parser.parse_args() 388 389 ARTIFACT_PATH = os.path.join(cwd, args.artifact_path) 390 MICROPYTHON_PATH = os.path.join(cwd, args.mpy_path) 391 DEBUG = args.debug 392 393 if not os.path.exists(ARTIFACT_PATH): 394 raise RuntimeError(f'Artifact path does not exist ({ARTIFACT_PATH})') 395 396 if not os.path.exists(MICROPYTHON_PATH): 397 raise RuntimeError(f'MicroPython binary not found ({MICROPYTHON_PATH})') 398 399 debug_log_path = os.path.join(ARTIFACT_PATH, 'debug.log') 400 debug_log = open(debug_log_path, 'w') 401 402 unittest.main(argv=[sys.argv[0], '-v']) 403 404 debug_log.close() 405 print(f'View the debug output in "{debug_log_path}"') 406