1"""
2The test script will only run the one test on micropython. It is not a
3framework that can be used for other tests. This has a reduced code footprint.
4entrypoint API is almost the same as the framework.
5
6
7Requirements
8Python >= 3.10
9
10pillow library
11
12add the following to the CI prior to the test running
13
14python3 -m pip install pillow
15
16Example command line to run the test. I suggest doing this from the root of the
17binding directory. It is just a simple location to do it from.
18
19Paths that are passed in MUST be relative to the current working directory.
20python3 lib/lv_bindings/lvgl/tests/micropy_test/__init__.py --artifact-path=lib/lv_bindings/lvgl/tests/micropy_test/artifacts --mpy-path=ports/unix/build-standard/micropython
21
22"""
23import os
24import time
25import signal
26import argparse
27import binascii
28import unittest
29import threading
30import subprocess
31
32from PIL import Image as Image
33
34
35DEBUG = 0
36
37debug_log = None
38
39saved_test_data = []
40
41
42def format_error_data(data):
43    output = ''
44    for line in data.split('\n'):
45        output += f'\033[31;1m{line}\033[0m\n'
46    return output
47
48
49def log(*args):
50    args = ' '.join(repr(arg) for arg in args)
51    debug_log.write(args + '\n')
52
53    if DEBUG:
54        sys.stdout.write('\033[31;1m' + args + '\033[0m\n')
55        sys.stdout.flush()
56
57
58BASE_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__)))
59TEST_PATH = os.path.join(BASE_PATH, 'micropy.py')
60IMG_PATH = os.path.join(BASE_PATH, '../ref_imgs/binding.png')
61
62CTRL_C = b'\x03'  # 2 times to exit any running code
63CTRL_D = b'\x04'  # exit paste mode committing and running what has been pasted
64CTRL_E = b'\x05'  # enter paste mode
65
66PASTE_PROMPT = b'==='
67REPL_PROMPT = b'>>>'
68
69os.environ['MICROPYINSPECT'] = '1'
70
71
72class TestData:
73
74    def __init__(self):
75        self.watchdog_timer = time.time()
76        self.result = None
77        self.error_data = ''
78        self.event = threading.Event()
79
80
81class MicroPython_Test(unittest.TestCase):
82    # these are here simply to make an IDE happy. Their values get dynamically
83    # set when the class gets constructed
84    process: subprocess.Popen = None
85    exit_event: threading.Event = None
86
87    @classmethod
88    def send(cls, cmd):
89        if cls.process is None:
90            return
91
92        log('<---', cmd)
93        cls.process.stdin.write(cmd)
94        cls.process.stdin.flush()
95
96    @classmethod
97    def read_until(cls, marker):
98        micropy_data = b''
99        error_data = b''
100
101        log('MARKER', marker)
102
103        logged = False
104
105        while (
106            not micropy_data.endswith(marker) and
107            not cls.exit_event.is_set()
108        ):
109            try:
110                char = cls.process.stdout.read(1)
111            except:  # NOQA
112                break
113
114            if char:
115                micropy_data += char
116                logged = False
117            else:
118                logged = True
119                log('--->', micropy_data)
120
121            if b'\nERROR END\n' in micropy_data:
122                error_data = micropy_data.split(b'\nERROR START\n')[-1].split(b'\nERROR END\n')[0]
123                micropy_data = b''
124                try:
125                    log('---> ERROR: ', error_data.decode('utf-8'))
126                except UnicodeDecodeError:
127                    log('---> ERROR: ', error_data)
128
129                logged = True
130                break
131
132        if not logged:
133            log('--->', micropy_data)
134
135        if b'\nERROR END\n' in micropy_data:
136            error_data = micropy_data.split(b'\nERROR START\n')[-1].split(b'\nERROR END\n')[0]
137            micropy_data = b''
138            try:
139                log('---> ERROR: ', error_data.decode('utf-8'))
140            except UnicodeDecodeError:
141                log('---> ERROR: ', error_data)
142
143        if cls.exit_event.is_set():
144            log('--EXIT EVENT SET')
145
146        saved_test_data.append((micropy_data, error_data))
147
148        return micropy_data.replace(marker, b''), error_data
149
150    @classmethod
151    def setUpClass(cls):
152        os.chdir(os.path.dirname(__file__))
153        cls.exit_event = threading.Event()
154
155        log(f'--SETTING UP')
156        cls.process = subprocess.Popen(
157            ['bash'],
158            stdout=subprocess.PIPE,
159            stderr=subprocess.PIPE,
160            stdin=subprocess.PIPE,
161            env=os.environ,
162            shell=True,
163            preexec_fn=os.setsid
164        )
165        log(f'--RUNNING MICROPYTHON ({MICROPYTHON_PATH})')
166
167        cls.send(b'cd ' + os.path.dirname(__file__).encode('utf-8') + b'\n')
168        cls.send(MICROPYTHON_PATH.encode('utf-8') + b'\n')
169        _, error_data = cls.read_until(b'>>>')
170
171        if error_data:
172            raise RuntimeError(error_data)
173
174        log('--MICROPYTHON STARTED')
175
176    @classmethod
177    def tearDownClass(cls):
178        log('--TEARDOWN STARTED')
179
180        if cls.process is not None:
181            cls.send(CTRL_C)
182            cls.send(CTRL_C)
183            cls.send(CTRL_D)
184
185            if not cls.process.stdin.closed:
186                cls.process.stdin.close()
187
188            os.killpg(os.getpgid(cls.process.pid), signal.SIGTERM)
189
190            cls.process.wait()
191
192            if not cls.process.stdout.closed:
193                cls.process.stdout.close()
194
195            if not cls.process.stderr.closed:
196                cls.process.stderr.close()
197
198            cls.process = None
199
200        log(f'--TEARDOWN FINISHED')
201
202    def test_1_image_compare(self):
203        image = Image.open(IMG_PATH)
204        res_img = image.convert('RGB')
205        image.close()
206        res_data = list(res_img.getdata())
207        res_img.close()
208
209        with open(TEST_PATH, 'rb') as f:
210            test_code = f.read()
211
212        log(f'--RUNNING TEST ({TEST_PATH})')
213
214        test_code = test_code.strip()
215
216        if self.__class__.process is None:
217            self.fail('MicroPython failure.')
218
219        self.send(CTRL_E)
220
221        _, error_data = self.read_until(PASTE_PROMPT)
222        if error_data:
223            self.fail(error_data)
224
225        test_code = test_code.split(b'\r\n')
226
227        for i, line in enumerate(test_code):
228            self.send(line + b'\n')
229            _, error_data = self.read_until(b'\n')
230
231            if error_data:
232                self.fail(error_data)
233
234            time.sleep(0.002)
235
236        # self.read_until(b'# end\n')
237
238        def _do(td: TestData):
239            self.send(CTRL_D)
240            td.error_data = b''
241            td.watchdog_timer = time.time()
242
243            td.result = []
244
245            try:
246                _, td.error_data = self.read_until(b'FRAME START\n')
247                td.watchdog_timer = time.time()
248
249                lne, td.error_data = self.read_until(b'\n')
250
251                while (
252                    b'FRAME END' not in lne and
253                    not td.error_data and
254                    not self.__class__.exit_event.is_set()
255                ):
256                    td.watchdog_timer = time.time()
257                    td.result.append(lne)
258                    lne, td.error_data = self.read_until(b'\n')
259
260                if td.error_data:
261                    return
262
263                if self.__class__.exit_event.is_set():
264                    return
265
266            except:  # NOQA
267                import traceback
268
269                traceback.print_exc()
270
271                td.error_data = traceback.format_exc()
272                return
273
274            td.event.set()
275
276        test_data = TestData()
277
278        t = threading.Thread(
279            target=_do,
280            args=(test_data,)
281        )
282        t.daemon = True
283        test_data.watchdog_timer = time.time()
284
285        t.start()
286
287        while (
288            (time.time() - test_data.watchdog_timer) * 1000 <= 20000 and
289            not test_data.event.is_set()
290        ):
291            test_data.event.wait(0.05)
292
293        if not test_data.event.is_set():
294            self.__class__.exit_event.set()
295            # self.read_until(REPL_PROMPT)
296
297        self.send(CTRL_C)
298        self.send(CTRL_C)
299
300        width = 800
301        height = 480
302
303        if test_data.error_data:
304            self.fail(test_data.error_data)
305
306        try:
307            frame = bytearray(
308                b''.join(binascii.unhexlify(lne) for lne in test_data.result)
309            )
310
311            # I don't exactly know why the byte order is backwards but it is
312            frame = bytes(bytearray([
313                item for j in range(0, len(frame), 3)
314                for item in [frame[j + 2], frame[j + 1], frame[j]]
315            ]))
316
317            image = Image.new('RGB', (width, height))
318            image.frombytes(frame)
319            img = image.convert('RGB')
320            image.close()
321
322            byte_data = list(img.getdata())
323            img.save(os.path.join(ARTIFACT_PATH, f'frame.png'), 'PNG')
324            img.close()
325
326            with open(os.path.join(ARTIFACT_PATH, f'frame.bin'), 'wb') as f:
327                # have to flatten the data and remove the alpha
328                # from the PIL image it is formatted as
329                # [(r, g, b), (r, g, b)]
330                f.write(bytes(bytearray([
331                    item for sublist in byte_data
332                    for item in sublist
333                ])))
334
335        except binascii.Error:
336            error = []
337            for line, err in saved_test_data:
338                if err:
339                    try:
340                        error.append(err.decode('utf-8'))
341                    except UnicodeDecodeError:
342                        error.append(str(err))
343
344            error = '\n'.join(error)
345            if error:
346                self.fail(format_error_data(error))
347            else:
348                self.fail(b'\n'.join(test_data.result))
349
350        except:  # NOQA
351            import traceback
352
353            self.fail(traceback.format_exc())
354
355        self.assertEqual(res_data, byte_data, 'Frames do not match')
356
357
358cwd = os.path.abspath(os.getcwd())
359
360ARTIFACT_PATH = os.path.join(cwd, 'artifacts')
361MICROPYTHON_PATH = os.path.join(cwd, 'micropython')
362
363
364if __name__ == '__main__':
365    import sys
366
367    arg_parser = argparse.ArgumentParser()
368    arg_parser.add_argument(
369        '--artifact-path',
370        dest='artifact_path',
371        help='path to save artifacts to',
372        action='store'
373    )
374    arg_parser.add_argument(
375        '--mpy-path',
376        dest='mpy_path',
377        help='path to micropython',
378        action='store'
379    )
380    arg_parser.add_argument(
381        '--debug',
382        dest='debug',
383        help='debug output',
384        action='store_true'
385    )
386
387    args = arg_parser.parse_args()
388
389    ARTIFACT_PATH = os.path.join(cwd, args.artifact_path)
390    MICROPYTHON_PATH = os.path.join(cwd, args.mpy_path)
391    DEBUG = args.debug
392
393    if not os.path.exists(ARTIFACT_PATH):
394        raise RuntimeError(f'Artifact path does not exist ({ARTIFACT_PATH})')
395
396    if not os.path.exists(MICROPYTHON_PATH):
397        raise RuntimeError(f'MicroPython binary not found ({MICROPYTHON_PATH})')
398
399    debug_log_path = os.path.join(ARTIFACT_PATH, 'debug.log')
400    debug_log = open(debug_log_path, 'w')
401
402    unittest.main(argv=[sys.argv[0], '-v'])
403
404    debug_log.close()
405    print(f'View the debug output in "{debug_log_path}"')
406