1#!/usr/bin/env python3
2# Copyright (c) 2023 Intel Corporation
3# Copyright (c) 2024 Arm Limited (or its affiliates). All rights reserved.
4#
5# SPDX-License-Identifier: Apache-2.0
6"""
7Tests for handlers.py classes' methods
8"""
9
10import itertools
11import mock
12import os
13import pytest
14import signal
15import subprocess
16import sys
17
18from contextlib import nullcontext
19from importlib import reload
20from serial import SerialException
21from subprocess import CalledProcessError, TimeoutExpired
22from types import SimpleNamespace
23
24import twisterlib.harness
25
26ZEPHYR_BASE = os.getenv("ZEPHYR_BASE")
27
28from twisterlib.error import TwisterException
29from twisterlib.statuses import TwisterStatus
30from twisterlib.handlers import (
31    Handler,
32    BinaryHandler,
33    DeviceHandler,
34    QEMUHandler,
35    SimulationHandler
36)
37from twisterlib.hardwaremap import (
38    DUT
39)
40
41@pytest.fixture
42def mocked_instance(tmp_path):
43    instance = mock.Mock()
44
45    testsuite = mock.Mock()
46    type(testsuite).source_dir = mock.PropertyMock(return_value='')
47    instance.testsuite = testsuite
48
49    build_dir = tmp_path / 'build_dir'
50    os.makedirs(build_dir)
51    type(instance).build_dir = mock.PropertyMock(return_value=str(build_dir))
52
53    platform = mock.Mock()
54    type(platform).binaries = mock.PropertyMock(return_value=[])
55    instance.platform = platform
56
57    type(instance.testsuite).timeout = mock.PropertyMock(return_value=60)
58    type(instance.platform).timeout_multiplier = mock.PropertyMock(
59        return_value=2
60    )
61
62    instance.status = TwisterStatus.NONE
63    instance.reason = 'Unknown'
64
65    return instance
66
67
68@pytest.fixture
69def faux_timer():
70    class Counter:
71        def __init__(self):
72            self.t = 0
73
74        def time(self):
75            self.t += 1
76            return self.t
77
78    return Counter()
79
80
81TESTDATA_1 = [
82    (True, False, 'posix', ['Install pyserial python module with pip to use' \
83     ' --device-testing option.'], None),
84    (False, True, 'nt', [], None),
85    (True, True, 'posix', ['Install pyserial python module with pip to use' \
86     ' --device-testing option.'], ImportError),
87]
88
89@pytest.mark.parametrize(
90    'fail_serial, fail_pty, os_name, expected_outs, expected_error',
91    TESTDATA_1,
92    ids=['import serial', 'import pty nt', 'import serial+pty posix']
93)
94def test_imports(
95    capfd,
96    fail_serial,
97    fail_pty,
98    os_name,
99    expected_outs,
100    expected_error
101):
102    class ImportRaiser:
103        def find_spec(self, fullname, path, target=None):
104            if fullname == 'serial' and fail_serial:
105                raise ImportError()
106            if fullname == 'pty' and fail_pty:
107                raise ImportError()
108
109    modules_mock = sys.modules.copy()
110    modules_mock['serial'] = None if fail_serial else modules_mock['serial']
111    modules_mock['pty'] = None if fail_pty else modules_mock['pty']
112
113    meta_path_mock = sys.meta_path[:]
114    meta_path_mock.insert(0, ImportRaiser())
115
116    with mock.patch('os.name', os_name), \
117         mock.patch.dict('sys.modules', modules_mock, clear=True), \
118         mock.patch('sys.meta_path', meta_path_mock), \
119         pytest.raises(expected_error) if expected_error else nullcontext():
120        reload(twisterlib.handlers)
121
122    out, _ = capfd.readouterr()
123    assert all([expected_out in out for expected_out in expected_outs])
124
125
126def test_handler_final_handle_actions(mocked_instance):
127    instance = mocked_instance
128    instance.testcases = [mock.Mock()]
129
130    handler = Handler(mocked_instance, 'build', mock.Mock())
131    handler.suite_name_check = True
132
133    harness = twisterlib.harness.Test()
134    harness.status = TwisterStatus.NONE
135    harness.detected_suite_names = mock.Mock()
136    harness.matched_run_id = False
137    harness.run_id_exists = True
138    harness.recording = mock.Mock()
139
140    handler_time = mock.Mock()
141
142    handler._final_handle_actions(harness, handler_time)
143
144    assert handler.instance.status == TwisterStatus.FAIL
145    assert handler.instance.execution_time == handler_time
146    assert handler.instance.reason == 'RunID mismatch'
147    assert all(testcase.status == TwisterStatus.FAIL for \
148     testcase in handler.instance.testcases)
149
150    handler.instance.reason = 'This reason shan\'t be changed.'
151    handler._final_handle_actions(harness, handler_time)
152
153    instance.assert_has_calls([mock.call.record(harness.recording)])
154
155    assert handler.instance.reason == 'This reason shan\'t be changed.'
156
157
158TESTDATA_2 = [
159    (['dummy_testsuite_name'], False),
160    ([], True),
161    (['another_dummy_name', 'yet_another_dummy_name'], True),
162]
163
164@pytest.mark.parametrize(
165    'detected_suite_names, should_be_called',
166    TESTDATA_2,
167    ids=['detected one expected', 'detected none', 'detected two unexpected']
168)
169def test_handler_verify_ztest_suite_name(
170    mocked_instance,
171    detected_suite_names,
172    should_be_called
173):
174    instance = mocked_instance
175    type(instance.testsuite).ztest_suite_names = ['dummy_testsuite_name']
176
177    harness_status = TwisterStatus.PASS
178
179    handler_time = mock.Mock()
180
181    with mock.patch.object(Handler, '_missing_suite_name') as _missing_mocked:
182        handler = Handler(instance, 'build', mock.Mock())
183        handler._verify_ztest_suite_name(
184            harness_status,
185            detected_suite_names,
186            handler_time
187        )
188
189        if should_be_called:
190            _missing_mocked.assert_called_once()
191        else:
192            _missing_mocked.assert_not_called()
193
194
195def test_handler_missing_suite_name(mocked_instance):
196    instance = mocked_instance
197    instance.testcases = [mock.Mock()]
198
199    handler = Handler(mocked_instance, 'build', mock.Mock())
200    handler.suite_name_check = True
201
202    expected_suite_names = ['dummy_testsuite_name']
203
204    handler_time = mock.Mock()
205
206    handler._missing_suite_name(expected_suite_names, handler_time)
207
208    assert handler.instance.status == TwisterStatus.FAIL
209    assert handler.instance.execution_time == handler_time
210    assert handler.instance.reason == 'Testsuite mismatch'
211    assert all(
212        testcase.status == TwisterStatus.FAIL for testcase in handler.instance.testcases
213    )
214
215
216def test_handler_terminate(mocked_instance):
217    def mock_kill_function(pid, sig):
218        if pid < 0:
219            raise ProcessLookupError
220
221    instance = mocked_instance
222
223    handler = Handler(instance, 'build', mock.Mock())
224
225    mock_process = mock.Mock()
226    mock_child1 = mock.Mock(pid=1)
227    mock_child2 = mock.Mock(pid=2)
228    mock_process.children = mock.Mock(return_value=[mock_child1, mock_child2])
229
230    mock_proc = mock.Mock(pid=0)
231    mock_proc.terminate = mock.Mock(return_value=None)
232    mock_proc.kill = mock.Mock(return_value=None)
233
234    with mock.patch('psutil.Process', return_value=mock_process), \
235         mock.patch(
236        'os.kill',
237        mock.Mock(side_effect=mock_kill_function)
238    ) as mock_kill:
239        handler.terminate(mock_proc)
240
241        assert handler.terminated
242        mock_proc.terminate.assert_called_once()
243        mock_proc.kill.assert_called_once()
244        mock_kill.assert_has_calls(
245            [mock.call(1, signal.SIGTERM), mock.call(2, signal.SIGTERM)]
246        )
247
248        mock_child_neg1 = mock.Mock(pid=-1)
249        mock_process.children = mock.Mock(
250            return_value=[mock_child_neg1, mock_child2]
251        )
252        handler.terminated = False
253        mock_kill.reset_mock()
254
255        handler.terminate(mock_proc)
256
257    mock_kill.assert_has_calls(
258        [mock.call(-1, signal.SIGTERM), mock.call(2, signal.SIGTERM)]
259    )
260
261
262def test_binaryhandler_try_kill_process_by_pid(mocked_instance):
263    def mock_kill_function(pid, sig):
264        if pid < 0:
265            raise ProcessLookupError
266
267    instance = mocked_instance
268
269    handler = BinaryHandler(instance, 'build', mock.Mock())
270    handler.pid_fn = os.path.join('dummy', 'path', 'to', 'pid.pid')
271
272    with mock.patch(
273        'os.kill',
274        mock.Mock(side_effect=mock_kill_function)
275    ) as mock_kill, \
276         mock.patch('os.unlink', mock.Mock()) as mock_unlink:
277        with mock.patch('builtins.open', mock.mock_open(read_data='1')):
278            handler.try_kill_process_by_pid()
279
280        mock_unlink.assert_called_once_with(
281            os.path.join('dummy', 'path', 'to', 'pid.pid')
282        )
283        mock_kill.assert_called_once_with(1, signal.SIGKILL)
284
285        mock_unlink.reset_mock()
286        mock_kill.reset_mock()
287        handler.pid_fn = os.path.join('dummy', 'path', 'to', 'pid.pid')
288
289        with mock.patch('builtins.open', mock.mock_open(read_data='-1')):
290            handler.try_kill_process_by_pid()
291
292        mock_unlink.assert_called_once_with(
293            os.path.join('dummy', 'path', 'to', 'pid.pid')
294        )
295        mock_kill.assert_called_once_with(-1, signal.SIGKILL)
296
297
298TESTDATA_3 = [
299    (
300        [b'This\\r\\n\n', b'is\r', b'some \x1B[31mANSI\x1B[39m in\n', b'a short\n', b'file.'],
301        mock.Mock(status=TwisterStatus.NONE, capture_coverage=False),
302        [
303            mock.call('This\\r\\n\n'),
304            mock.call('is\r'),
305            mock.call('some ANSI in\n'),
306            mock.call('a short\n'),
307            mock.call('file.')
308        ],
309        [
310            mock.call('This'),
311            mock.call('is'),
312            mock.call('some \x1B[31mANSI\x1B[39m in'),
313            mock.call('a short'),
314            mock.call('file.')
315        ],
316        None,
317        False
318    ),
319    (
320        [b'Too much.'] * 120,  # Should be more than the timeout
321        mock.Mock(status=TwisterStatus.PASS, capture_coverage=False),
322        None,
323        None,
324        True,
325        False
326    ),
327    (
328        [b'Too much.'] * 120,  # Should be more than the timeout
329        mock.Mock(status=TwisterStatus.PASS, capture_coverage=False),
330        None,
331        None,
332        True,
333        False
334    ),
335    (
336        [b'Too much.'] * 120,  # Should be more than the timeout
337        mock.Mock(status=TwisterStatus.PASS, capture_coverage=True),
338        None,
339        None,
340        False,
341        True
342    ),
343]
344
345@pytest.mark.parametrize(
346    'proc_stdout, harness, expected_handler_calls,'
347    ' expected_harness_calls, should_be_less, timeout_wait',
348    TESTDATA_3,
349    ids=[
350        'no timeout',
351        'timeout',
352        'timeout with harness status',
353        'timeout with capture_coverage, wait timeout'
354    ]
355)
356def test_binaryhandler_output_handler(
357    mocked_instance,
358    faux_timer,
359    proc_stdout,
360    harness,
361    expected_handler_calls,
362    expected_harness_calls,
363    should_be_less,
364    timeout_wait
365):
366    class MockStdout(mock.Mock):
367        def __init__(self, text):
368            super().__init__(text)
369            self.text = text
370            self.line_index = 0
371
372        def readline(self):
373            if self.line_index == len(self.text):
374                self.line_index = 0
375                return b''
376            else:
377                line = self.text[self.line_index]
378                self.line_index += 1
379                return line
380
381    class MockProc(mock.Mock):
382        def __init__(self, pid, stdout):
383            super().__init__(pid, stdout)
384            self.pid = mock.PropertyMock(return_value=pid)
385            self.stdout = MockStdout(stdout)
386
387        def wait(self, *args, **kwargs):
388            if timeout_wait:
389                raise TimeoutExpired('dummy cmd', 'dummyamount')
390
391    handler = BinaryHandler(mocked_instance, 'build', mock.Mock(timeout_multiplier=1))
392    handler.terminate = mock.Mock()
393
394    proc = MockProc(1, proc_stdout)
395
396    with mock.patch(
397        'builtins.open',
398        mock.mock_open(read_data='')
399    ) as mock_file, \
400         mock.patch('time.time', side_effect=faux_timer.time):
401        handler._output_handler(proc, harness)
402
403        mock_file.assert_called_with(handler.log, 'w')
404
405    if expected_handler_calls:
406        mock_file.return_value.write.assert_has_calls(expected_handler_calls)
407    if expected_harness_calls:
408        harness.handle.assert_has_calls(expected_harness_calls)
409    if should_be_less is not None:
410        if should_be_less:
411            assert mock_file.return_value.write.call_count < len(proc_stdout)
412        else:
413            assert mock_file.return_value.write.call_count == len(proc_stdout)
414    if timeout_wait:
415        handler.terminate.assert_called_once_with(proc)
416
417
418TESTDATA_4 = [
419    (True, False, True, None, None,
420     ['valgrind', '--error-exitcode=2', '--leak-check=full',
421      f'--suppressions={ZEPHYR_BASE}/scripts/valgrind.supp',
422      '--log-file=build_dir/valgrind.log', '--track-origins=yes',
423      'generator']),
424    (False, True, False, 123, None, ['generator', '-C', 'build_dir', 'run', '--seed=123']),
425    (False, False, False, None, ['ex1', 'ex2'], ['build_dir/zephyr/zephyr.exe', 'ex1', 'ex2']),
426]
427
428@pytest.mark.parametrize(
429    'robot_test, call_make_run, enable_valgrind, seed,' \
430    ' extra_args, expected',
431    TESTDATA_4,
432    ids=['robot, valgrind', 'make run, seed', 'binary, extra']
433)
434def test_binaryhandler_create_command(
435    mocked_instance,
436    robot_test,
437    call_make_run,
438    enable_valgrind,
439    seed,
440    extra_args,
441    expected
442):
443    options = SimpleNamespace()
444    options.enable_valgrind = enable_valgrind
445    options.coverage_basedir = "coverage_basedir"
446    options.sim_name = None
447    handler = BinaryHandler(mocked_instance, 'build', options, 'generator', False)
448    handler.binary = 'bin'
449    handler.call_make_run = call_make_run
450    handler.seed = seed
451    handler.extra_test_args = extra_args
452    handler.build_dir = 'build_dir'
453    handler.instance.sysbuild = False
454    handler.platform = SimpleNamespace()
455    handler.platform.resc = "file.resc"
456    handler.platform.uart = "uart"
457
458    command = handler._create_command(robot_test)
459
460    assert command == expected
461
462
463TESTDATA_5 = [
464    (False, False, False),
465    (True, False, False),
466    (True, True, False),
467    (False, False, True),
468]
469
470@pytest.mark.parametrize(
471    'enable_asan, enable_lsan, enable_ubsan',
472    TESTDATA_5,
473    ids=['none', 'asan', 'asan, lsan', 'ubsan']
474)
475def test_binaryhandler_create_env(
476    mocked_instance,
477    enable_asan,
478    enable_lsan,
479    enable_ubsan
480):
481    handler = BinaryHandler(mocked_instance, 'build', mock.Mock(
482        enable_asan=enable_asan,
483        enable_lsan=enable_lsan,
484        enable_ubsan=enable_ubsan
485    ))
486
487    env = {
488        'example_env_var': True,
489        'ASAN_OPTIONS': 'dummy=dummy:',
490        'UBSAN_OPTIONS': 'dummy=dummy:'
491    }
492
493    with mock.patch('os.environ', env):
494        res = handler._create_env()
495
496    assert env['example_env_var'] == res['example_env_var']
497
498    if enable_ubsan:
499        assert env['UBSAN_OPTIONS'] in res['UBSAN_OPTIONS']
500        assert 'log_path=stdout:' in res['UBSAN_OPTIONS']
501        assert 'halt_on_error=1:' in res['UBSAN_OPTIONS']
502
503    if enable_asan:
504        assert env['ASAN_OPTIONS'] in res['ASAN_OPTIONS']
505        assert 'log_path=stdout:' in res['ASAN_OPTIONS']
506
507        if not enable_lsan:
508            assert 'detect_leaks=0' in res['ASAN_OPTIONS']
509
510
511TESTDATA_6 = [
512    (TwisterStatus.NONE, False, 2, True, TwisterStatus.FAIL, 'Valgrind error', False),
513    (TwisterStatus.NONE, False, 1, False, TwisterStatus.FAIL, 'Failed (rc=1)', False),
514    (TwisterStatus.FAIL, False, 0, False, TwisterStatus.FAIL, "Failed harness:'foobar'", False),
515    ('success', False, 0, False, 'success', 'Unknown', False),
516    (TwisterStatus.NONE, True, 1, True, TwisterStatus.FAIL, 'Timeout', True),
517]
518
519@pytest.mark.parametrize(
520    'harness_status, terminated, returncode, enable_valgrind,' \
521    ' expected_status, expected_reason, do_add_missing',
522    TESTDATA_6,
523    ids=['valgrind error', 'failed', 'harness failed', 'custom success', 'no status']
524)
525def test_binaryhandler_update_instance_info(
526    mocked_instance,
527    harness_status,
528    terminated,
529    returncode,
530    enable_valgrind,
531    expected_status,
532    expected_reason,
533    do_add_missing
534):
535    handler = BinaryHandler(mocked_instance, 'build', mock.Mock(
536        enable_valgrind=enable_valgrind
537    ))
538    handler_time = 59
539    handler.terminated = terminated
540    handler.returncode = returncode
541    missing_mock = mock.Mock()
542    handler.instance.add_missing_case_status = missing_mock
543    mocked_harness = mock.Mock(status=harness_status, reason="foobar")
544
545    handler._update_instance_info(mocked_harness, handler_time)
546
547    assert handler.instance.execution_time == handler_time
548
549    assert handler.instance.status == expected_status
550    assert handler.instance.reason == expected_reason
551
552    if do_add_missing:
553        missing_mock.assert_called_once_with(TwisterStatus.BLOCK, expected_reason)
554
555
556TESTDATA_7 = [
557    (True, False, False),
558    (False, True, False),
559    (False, False, True),
560]
561
562@pytest.mark.parametrize(
563    'is_robot_test, coverage, isatty',
564    TESTDATA_7,
565    ids=['robot test', 'coverage', 'isatty']
566)
567def test_binaryhandler_handle(
568    mocked_instance,
569    caplog,
570    is_robot_test,
571    coverage,
572    isatty
573):
574    thread_mock_obj = mock.Mock()
575
576    def mock_popen(command, *args, **kwargs,):
577        return mock.Mock(
578          __enter__=mock.Mock(return_value=mock.Mock(pid=0, returncode=0)),
579          __exit__=mock.Mock(return_value=None)
580        )
581
582    def mock_thread(target, *args, **kwargs):
583        return thread_mock_obj
584
585    handler = BinaryHandler(mocked_instance, 'build', mock.Mock(coverage=coverage))
586    handler.sourcedir = 'source_dir'
587    handler.build_dir = 'build_dir'
588    handler.name= 'Dummy Name'
589    handler._create_command = mock.Mock(return_value=['dummy' , 'command'])
590    handler._create_env = mock.Mock(return_value=[])
591    handler._update_instance_info = mock.Mock()
592    handler._final_handle_actions = mock.Mock()
593    handler.terminate = mock.Mock()
594    handler.try_kill_process_by_pid = mock.Mock()
595
596    robot_mock = mock.Mock()
597    harness = mock.Mock(is_robot_test=is_robot_test, run_robot_test=robot_mock)
598
599    popen_mock = mock.Mock(side_effect=mock_popen)
600    thread_mock = mock.Mock(side_effect=mock_thread)
601    call_mock = mock.Mock()
602
603    with mock.patch('subprocess.call', call_mock), \
604         mock.patch('subprocess.Popen', popen_mock), \
605         mock.patch('threading.Thread', thread_mock), \
606         mock.patch('sys.stdout.isatty', return_value=isatty):
607        handler.handle(harness)
608
609    if is_robot_test:
610        robot_mock.assert_called_once_with(['dummy', 'command'], mock.ANY)
611        return
612
613    assert 'Spawning BinaryHandler Thread for Dummy Name' in caplog.text
614
615    thread_mock_obj.join.assert_called()
616    handler._update_instance_info.assert_called_once()
617    handler._final_handle_actions.assert_called_once()
618
619    if isatty:
620        call_mock.assert_any_call(['stty', 'sane'], stdin=mock.ANY)
621
622
623TESTDATA_8 = [
624    ('renode', True, True, False, False),
625    ('native', False, False, False, True),
626    ('build', False, True, False, False),
627]
628
629@pytest.mark.parametrize(
630    'type_str, is_pid_fn, expected_call_make_run, is_binary, expected_ready',
631    TESTDATA_8,
632    ids=[t[0] for t in TESTDATA_8]
633)
634def test_simulationhandler_init(
635    mocked_instance,
636    type_str,
637    is_pid_fn,
638    expected_call_make_run,
639    is_binary,
640    expected_ready
641):
642    handler = SimulationHandler(mocked_instance, type_str, mock.Mock())
643
644    assert handler.call_make_run == expected_call_make_run
645    assert handler.ready == expected_ready
646
647    if is_pid_fn:
648        assert handler.pid_fn == os.path.join(mocked_instance.build_dir,
649                                              'renode.pid')
650    if is_binary:
651        assert handler.pid_fn == os.path.join(mocked_instance.build_dir,
652                                              'zephyr', 'zephyr.exe')
653
654
655TESTDATA_9 = [
656    (3, 2, 0, 0, 3, -1, True, False, False, 1),
657    (4, 1, 0, 0, -1, -1, False, True, False, 0),
658    (5, 0, 1, 2, -1, 4, False, False, True, 3)
659]
660
661@pytest.mark.parametrize(
662    'success_count, in_waiting_count, oserror_count, readline_error_count,'
663    ' haltless_count, statusless_count, end_by_halt, end_by_close,'
664    ' end_by_status, expected_line_count',
665    TESTDATA_9,
666    ids=[
667      'halt event',
668      'serial closes',
669      'harness status with errors'
670    ]
671)
672def test_devicehandler_monitor_serial(
673    mocked_instance,
674    success_count,
675    in_waiting_count,
676    oserror_count,
677    readline_error_count,
678    haltless_count,
679    statusless_count,
680    end_by_halt,
681    end_by_close,
682    end_by_status,
683    expected_line_count
684):
685    is_open_iter = iter(lambda: True, False)
686    line_iter = [
687        TypeError('dummy TypeError') if x % 2 else \
688        SerialException('dummy SerialException') for x in range(
689            readline_error_count
690        )
691    ] + [
692        f'line no {idx}'.encode('utf-8') for idx in range(success_count)
693    ]
694    in_waiting_iter = [False] * in_waiting_count + [
695        TypeError('dummy TypeError')
696    ] if end_by_close else (
697        [OSError('dummy OSError')] * oserror_count + [False] * in_waiting_count
698    ) + [True] * (success_count + readline_error_count)
699
700    is_set_iter = [False] * haltless_count + [True] \
701        if end_by_halt else iter(lambda: False, True)
702
703    status_iter = [TwisterStatus.NONE] * statusless_count + [TwisterStatus.PASS] \
704        if end_by_status else iter(lambda: TwisterStatus.NONE, TwisterStatus.PASS)
705
706    halt_event = mock.Mock(is_set=mock.Mock(side_effect=is_set_iter))
707    ser = mock.Mock(
708        isOpen=mock.Mock(side_effect=is_open_iter),
709        readline=mock.Mock(side_effect=line_iter)
710    )
711    type(ser).in_waiting = mock.PropertyMock(
712        side_effect=in_waiting_iter,
713        return_value=False
714    )
715    harness = mock.Mock(capture_coverage=False)
716    type(harness).status=mock.PropertyMock(side_effect=status_iter)
717
718    handler = DeviceHandler(mocked_instance, 'build', mock.Mock(enable_coverage=not end_by_status))
719
720    with mock.patch('builtins.open', mock.mock_open(read_data='')):
721        handler.monitor_serial(ser, halt_event, harness)
722
723    if not end_by_close:
724        ser.close.assert_called_once()
725
726    print(harness.call_args_list)
727
728    harness.handle.assert_has_calls(
729        [mock.call(f'line no {idx}') for idx in range(expected_line_count)]
730    )
731
732
733def test_devicehandler_monitor_serial_splitlines(mocked_instance):
734    halt_event = mock.Mock(is_set=mock.Mock(return_value=False))
735    ser = mock.Mock(
736        isOpen=mock.Mock(side_effect=[True, True, False]),
737        in_waiting=mock.Mock(return_value=False),
738        readline=mock.Mock(return_value='\nline1\nline2\n'.encode('utf-8'))
739    )
740    harness = mock.Mock(status=TwisterStatus.PASS)
741
742    handler = DeviceHandler(mocked_instance, 'build', mock.Mock(enable_coverage=False))
743
744    with mock.patch('builtins.open', mock.mock_open(read_data='')):
745        handler.monitor_serial(ser, halt_event, harness)
746
747    assert harness.handle.call_count == 2
748
749
750TESTDATA_10 = [
751    (
752        'dummy_platform',
753        'dummy fixture',
754        [
755            mock.Mock(
756                fixtures=[],
757                platform='dummy_platform',
758                available=1,
759                failures=0,
760                counter_increment=mock.Mock(),
761                counter=0
762            ),
763            mock.Mock(
764                fixtures=['dummy fixture'],
765                platform='another_platform',
766                available=1,
767                failures=0,
768                counter_increment=mock.Mock(),
769                counter=0
770            ),
771            mock.Mock(
772                fixtures=['dummy fixture'],
773                platform='dummy_platform',
774                serial_pty=None,
775                serial=None,
776                available=1,
777                failures=0,
778                counter_increment=mock.Mock(),
779                counter=0
780            ),
781            mock.Mock(
782                fixtures=['dummy fixture'],
783                platform='dummy_platform',
784                serial_pty=mock.Mock(),
785                available=1,
786                failures=0,
787                counter_increment=mock.Mock(),
788                counter=0
789            ),
790            mock.Mock(
791                fixtures=['dummy fixture'],
792                platform='dummy_platform',
793                serial_pty=mock.Mock(),
794                available=1,
795                failures=0,
796                counter_increment=mock.Mock(),
797                counter=0
798            )
799        ],
800        3
801    ),
802    (
803        'dummy_platform',
804        'dummy fixture',
805        [
806            mock.Mock(
807                fixtures=[],
808                platform='dummy_platform',
809                available=1,
810                failures=0,
811                counter_increment=mock.Mock(),
812                counter=0
813            ),
814            mock.Mock(
815                fixtures=['dummy fixture'],
816                platform='another_platform',
817                available=1,
818                failures=0,
819                counter_increment=mock.Mock(),
820                counter=0
821            ),
822            mock.Mock(
823                fixtures=['dummy fixture'],
824                platform='dummy_platform',
825                serial_pty=None,
826                serial=None,
827                available=1,
828                failures=0,
829                counter_increment=mock.Mock(),
830                counter=0
831            ),
832            mock.Mock(
833                fixtures=['dummy fixture'],
834                platform='dummy_platform',
835                serial_pty=mock.Mock(),
836                available=1,
837                failures=1,
838                counter_increment=mock.Mock(),
839                counter=0
840            ),
841            mock.Mock(
842                fixtures=['dummy fixture'],
843                platform='dummy_platform',
844                serial_pty=mock.Mock(),
845                available=1,
846                failures=0,
847                counter_increment=mock.Mock(),
848                counter=0
849            )
850        ],
851        4
852    ),
853    (
854        'dummy_platform',
855        'dummy fixture',
856        [],
857        TwisterException
858    ),
859    (
860        'dummy_platform',
861        'dummy fixture',
862        [
863            mock.Mock(
864                fixtures=['dummy fixture'],
865                platform='dummy_platform',
866                serial_pty=mock.Mock(),
867                counter_increment=mock.Mock(),
868                failures=0,
869                available=0
870            ),
871            mock.Mock(
872                fixtures=['another fixture'],
873                platform='dummy_platform',
874                serial_pty=mock.Mock(),
875                counter_increment=mock.Mock(),
876                failures=0,
877                available=0
878            ),
879            mock.Mock(
880                fixtures=['dummy fixture'],
881                platform='dummy_platform',
882                serial=mock.Mock(),
883                counter_increment=mock.Mock(),
884                failures=0,
885                available=0
886            ),
887            mock.Mock(
888                fixtures=['another fixture'],
889                platform='dummy_platform',
890                serial=mock.Mock(),
891                counter_increment=mock.Mock(),
892                failures=0,
893                available=0
894            )
895        ],
896        None
897    )
898]
899
900@pytest.mark.parametrize(
901    'platform_name, fixture, duts, expected',
902    TESTDATA_10,
903    ids=['two good duts, select the first one',
904         'two duts, the first was failed once, select the second not failed',
905         'exception - no duts', 'no available duts']
906)
907def test_devicehandler_device_is_available(
908    mocked_instance,
909    platform_name,
910    fixture,
911    duts,
912    expected
913):
914    mocked_instance.platform.name = platform_name
915    mocked_instance.testsuite.harness_config = {'fixture': fixture}
916
917    handler = DeviceHandler(mocked_instance, 'build', mock.Mock())
918    handler.duts = duts
919
920    if isinstance(expected, int):
921        device = handler.device_is_available(mocked_instance)
922
923        assert device == duts[expected]
924        assert device.available == 0
925        device.counter_increment.assert_called_once()
926    elif expected is None:
927        device = handler.device_is_available(mocked_instance)
928
929        assert device is None
930    elif isinstance(expected, type):
931        with pytest.raises(expected):
932            device = handler.device_is_available(mocked_instance)
933    else:
934        assert False
935
936
937def test_devicehandler_make_dut_available(mocked_instance):
938    serial = mock.Mock(name='dummy_serial')
939    duts = [
940        mock.Mock(available=0, serial=serial, serial_pty=None),
941        mock.Mock(available=0, serial=None, serial_pty=serial),
942        mock.Mock(
943            available=0,
944            serial=mock.Mock('another_serial'),
945            serial_pty=None
946        )
947    ]
948
949    handler = DeviceHandler(mocked_instance, 'build', mock.Mock())
950    handler.duts = duts
951
952    handler.make_dut_available(duts[1])
953
954    assert len([None for d in handler.duts if d.available == 1]) == 1
955    assert handler.duts[0].available == 0
956    assert handler.duts[2].available == 0
957
958    handler.make_dut_available(duts[0])
959
960    assert len([None for d in handler.duts if d.available == 1]) == 2
961    assert handler.duts[2].available == 0
962
963
964TESTDATA_11 = [
965    (mock.Mock(pid=0, returncode=0), False),
966    (mock.Mock(pid=0, returncode=1), False),
967    (mock.Mock(pid=0, returncode=1), True)
968]
969
970@pytest.mark.parametrize(
971    'mock_process, raise_timeout',
972    TESTDATA_11,
973    ids=['proper script', 'error', 'timeout']
974)
975def test_devicehandler_run_custom_script(caplog, mock_process, raise_timeout):
976    def raise_timeout_fn(timeout=-1):
977        if raise_timeout and timeout != -1:
978            raise subprocess.TimeoutExpired(None, timeout)
979        else:
980            return mock.Mock(), mock.Mock()
981
982    def assert_popen(command, *args, **kwargs):
983        return mock.Mock(
984            __enter__=mock.Mock(return_value=mock_process),
985            __exit__=mock.Mock(return_value=None)
986        )
987
988    mock_process.communicate = mock.Mock(side_effect=raise_timeout_fn)
989
990    script = [os.path.join('test','script', 'path'), 'arg']
991    timeout = 60
992
993    with mock.patch('subprocess.Popen', side_effect=assert_popen):
994        DeviceHandler.run_custom_script(script, timeout)
995
996    if raise_timeout:
997        assert all(
998            t in caplog.text.lower() for t in [str(script), 'timed out']
999        )
1000        mock_process.assert_has_calls(
1001            [
1002                mock.call.communicate(timeout=timeout),
1003                mock.call.kill(),
1004                mock.call.communicate()
1005            ]
1006        )
1007    elif mock_process.returncode == 0:
1008        assert not any([r.levelname == 'ERROR' for r in caplog.records])
1009    else:
1010        assert 'timed out' not in caplog.text.lower()
1011        assert 'custom script failure' in caplog.text.lower()
1012
1013
1014TESTDATA_12 = [
1015    (0, False),
1016    (4, False),
1017    (0, True)
1018]
1019
1020@pytest.mark.parametrize(
1021    'num_of_failures, raise_exception',
1022    TESTDATA_12,
1023    ids=['no failures', 'with failures', 'exception']
1024)
1025def test_devicehandler_get_hardware(
1026    mocked_instance,
1027    caplog,
1028    num_of_failures,
1029    raise_exception
1030):
1031    expected_hardware = mock.Mock()
1032
1033    def mock_availability(handler, instance, no=num_of_failures):
1034        if raise_exception:
1035            raise TwisterException(f'dummy message')
1036        if handler.no:
1037            handler.no -= 1
1038            return None
1039        return expected_hardware
1040
1041    handler = DeviceHandler(mocked_instance, 'build', mock.Mock())
1042    handler.no = num_of_failures
1043
1044    with mock.patch.object(
1045        DeviceHandler,
1046        'device_is_available',
1047        mock_availability
1048    ):
1049        hardware = handler.get_hardware()
1050
1051    if raise_exception:
1052        assert 'dummy message' in caplog.text.lower()
1053        assert mocked_instance.status == TwisterStatus.FAIL
1054        assert mocked_instance.reason == 'dummy message'
1055    else:
1056        assert hardware == expected_hardware
1057
1058
1059TESTDATA_13 = [
1060    (
1061        None,
1062        None,
1063        None,
1064        ['generator_cmd', '-C', '$build_dir', 'flash']
1065    ),
1066    (
1067        [],
1068        None,
1069        None,
1070        ['west', 'flash', '--skip-rebuild', '-d', '$build_dir']
1071    ),
1072    (
1073        '--dummy',
1074        None,
1075        None,
1076        ['west', 'flash', '--skip-rebuild', '-d', '$build_dir',
1077         '--', '--dummy']
1078    ),
1079    (
1080        '--dummy1,--dummy2',
1081        None,
1082        None,
1083        ['west', 'flash', '--skip-rebuild', '-d', '$build_dir',
1084         '--', '--dummy1', '--dummy2']
1085    ),
1086
1087    (
1088        None,
1089        'runner',
1090        'product',
1091        ['west', 'flash', '--skip-rebuild', '-d', '$build_dir',
1092         '--runner', 'runner', 'param1', 'param2']
1093    ),
1094
1095    (
1096        None,
1097        'pyocd',
1098        'product',
1099        ['west', 'flash', '--skip-rebuild', '-d', '$build_dir',
1100         '--runner', 'pyocd', 'param1', 'param2', '--', '--dev-id', 12345]
1101    ),
1102    (
1103        None,
1104        'nrfjprog',
1105        'product',
1106        ['west', 'flash', '--skip-rebuild', '-d', '$build_dir',
1107         '--runner', 'nrfjprog', 'param1', 'param2', '--', '--dev-id', 12345]
1108    ),
1109    (
1110        None,
1111        'openocd',
1112        'STM32 STLink',
1113        ['west', 'flash', '--skip-rebuild', '-d', '$build_dir',
1114         '--runner', 'openocd', 'param1', 'param2',
1115         '--', '--cmd-pre-init', 'hla_serial 12345']
1116    ),
1117    (
1118        None,
1119        'openocd',
1120        'STLINK-V3',
1121        ['west', 'flash', '--skip-rebuild', '-d', '$build_dir',
1122         '--runner', 'openocd', 'param1', 'param2',
1123         '--', '--cmd-pre-init', 'hla_serial 12345']
1124    ),
1125    (
1126        None,
1127        'openocd',
1128        'EDBG CMSIS-DAP',
1129        ['west', 'flash', '--skip-rebuild', '-d', '$build_dir',
1130         '--runner', 'openocd', 'param1', 'param2',
1131         '--', '--cmd-pre-init', 'cmsis_dap_serial 12345']
1132    ),
1133    (
1134        None,
1135        'jlink',
1136        'product',
1137        ['west', 'flash', '--skip-rebuild', '-d', '$build_dir',
1138         '--runner', 'jlink', '--dev-id', 12345,
1139         'param1', 'param2']
1140    ),
1141    (
1142        None,
1143        'stm32cubeprogrammer',
1144        'product',
1145        ['west', 'flash', '--skip-rebuild', '-d', '$build_dir',
1146         '--runner', 'stm32cubeprogrammer', '--tool-opt=sn=12345',
1147         'param1', 'param2']
1148    ),
1149
1150]
1151
1152TESTDATA_13_2 = [(True), (False)]
1153
1154@pytest.mark.parametrize(
1155    'self_west_flash, runner,' \
1156    ' hardware_product_name, expected',
1157    TESTDATA_13,
1158    ids=['generator', '--west-flash', 'one west flash value',
1159         'multiple west flash values', 'generic runner', 'pyocd',
1160         'nrfjprog', 'openocd, STM32 STLink', 'openocd, STLINK-v3',
1161         'openocd, EDBG CMSIS-DAP', 'jlink', 'stm32cubeprogrammer']
1162)
1163@pytest.mark.parametrize('hardware_probe', TESTDATA_13_2, ids=['probe', 'id'])
1164def test_devicehandler_create_command(
1165    mocked_instance,
1166    self_west_flash,
1167    runner,
1168    hardware_probe,
1169    hardware_product_name,
1170    expected
1171):
1172    handler = DeviceHandler(mocked_instance, 'build', mock.Mock())
1173    handler.options = mock.Mock(west_flash=self_west_flash)
1174    handler.generator_cmd = 'generator_cmd'
1175
1176    expected = [handler.build_dir if val == '$build_dir' else \
1177                val for val in expected]
1178
1179    hardware = mock.Mock(
1180        product=hardware_product_name,
1181        probe_id=12345 if hardware_probe else None,
1182        id=12345 if not hardware_probe else None,
1183        runner_params=['param1', 'param2']
1184    )
1185
1186    command = handler._create_command(runner, hardware)
1187
1188    assert command == expected
1189
1190
1191TESTDATA_14 = [
1192    ('success', False, 'success', 'Unknown', False),
1193    (TwisterStatus.FAIL, False, TwisterStatus.FAIL, "Failed harness:'foobar'", True),
1194    (TwisterStatus.ERROR, False, TwisterStatus.ERROR, 'Unknown', True),
1195    (TwisterStatus.NONE, True, TwisterStatus.NONE, 'Unknown', False),
1196    (TwisterStatus.NONE, False, TwisterStatus.FAIL, 'Timeout', True),
1197]
1198
1199@pytest.mark.parametrize(
1200    'harness_status, flash_error,' \
1201    ' expected_status, expected_reason, do_add_missing',
1202    TESTDATA_14,
1203    ids=['custom success', 'failed', 'error', 'flash error', 'no status']
1204)
1205def test_devicehandler_update_instance_info(
1206        mocked_instance,
1207        harness_status,
1208        flash_error,
1209        expected_status,
1210        expected_reason,
1211        do_add_missing
1212        ):
1213    handler = DeviceHandler(mocked_instance, 'build', mock.Mock())
1214    handler_time = 59
1215    missing_mock = mock.Mock()
1216    handler.instance.add_missing_case_status = missing_mock
1217    mocked_harness = mock.Mock(status=harness_status, reason="foobar")
1218
1219    handler._update_instance_info(mocked_harness, handler_time, flash_error)
1220
1221    assert handler.instance.execution_time == handler_time
1222
1223    assert handler.instance.status == expected_status
1224    assert handler.instance.reason == expected_reason
1225
1226    if do_add_missing:
1227        missing_mock.assert_called_with('blocked', expected_reason)
1228
1229
1230TESTDATA_15 = [
1231    ('dummy device', 'dummy pty', None, None, True, False, False),
1232    (
1233        'dummy device',
1234        'dummy pty',
1235        mock.Mock(communicate=mock.Mock(return_value=('', ''))),
1236        SerialException,
1237        False,
1238        True,
1239        'dummy pty'
1240    ),
1241    (
1242        'dummy device',
1243        None,
1244        None,
1245        SerialException,
1246        False,
1247        False,
1248        'dummy device'
1249    )
1250]
1251
1252@pytest.mark.parametrize(
1253    'serial_device, serial_pty, ser_pty_process, expected_exception,' \
1254    ' expected_result, terminate_ser_pty_process, make_available',
1255    TESTDATA_15,
1256    ids=['valid', 'serial pty process', 'no serial pty']
1257)
1258def test_devicehandler_create_serial_connection(
1259    mocked_instance,
1260    serial_device,
1261    serial_pty,
1262    ser_pty_process,
1263    expected_exception,
1264    expected_result,
1265    terminate_ser_pty_process,
1266    make_available
1267):
1268    def mock_serial(*args, **kwargs):
1269        if expected_exception:
1270            raise expected_exception('')
1271        return expected_result
1272
1273    handler = DeviceHandler(mocked_instance, 'build', mock.Mock(timeout_multiplier=1))
1274    missing_mock = mock.Mock()
1275    handler.instance.add_missing_case_status = missing_mock
1276    twisterlib.handlers.terminate_process = mock.Mock()
1277
1278    dut = DUT()
1279    dut.available = 0
1280    dut.failures = 0
1281    handler.duts = [dut]
1282
1283    hardware_baud = 14400
1284    flash_timeout = 60
1285    serial_mock = mock.Mock(side_effect=mock_serial)
1286
1287    with mock.patch('serial.Serial', serial_mock), \
1288         pytest.raises(expected_exception) if expected_exception else \
1289         nullcontext():
1290        result = handler._create_serial_connection(dut, serial_device, hardware_baud,
1291                                                   flash_timeout, serial_pty,
1292                                                   ser_pty_process)
1293
1294    if expected_result:
1295        assert result is not None
1296        assert dut.failures == 0
1297
1298    if expected_exception:
1299        assert handler.instance.status == TwisterStatus.FAIL
1300        assert handler.instance.reason == 'Serial Device Error'
1301        assert dut.available == 1
1302        assert dut.failures == 1
1303        missing_mock.assert_called_once_with('blocked', 'Serial Device Error')
1304
1305    if terminate_ser_pty_process:
1306        twisterlib.handlers.terminate_process.assert_called_once()
1307        ser_pty_process.communicate.assert_called_once()
1308
1309
1310TESTDATA_16 = [
1311    ('dummy1 dummy2', None, 'slave name'),
1312    ('dummy1,dummy2', CalledProcessError, None),
1313    (None, None, 'dummy hardware serial'),
1314]
1315
1316@pytest.mark.parametrize(
1317    'serial_pty, popen_exception, expected_device',
1318    TESTDATA_16,
1319    ids=['pty', 'pty process error', 'no pty']
1320)
1321def test_devicehandler_get_serial_device(
1322    mocked_instance,
1323    serial_pty,
1324    popen_exception,
1325    expected_device
1326):
1327    def mock_popen(command, *args, **kwargs):
1328        assert command == ['dummy1', 'dummy2']
1329        if popen_exception:
1330            raise popen_exception(command, 'Dummy error')
1331        return mock.Mock()
1332
1333    handler = DeviceHandler(mocked_instance, 'build', mock.Mock())
1334    hardware_serial = 'dummy hardware serial'
1335
1336    popen_mock = mock.Mock(side_effect=mock_popen)
1337    openpty_mock = mock.Mock(return_value=('master', 'slave'))
1338    ttyname_mock = mock.Mock(side_effect=lambda x: x + ' name')
1339
1340    with mock.patch('subprocess.Popen', popen_mock), \
1341         mock.patch('pty.openpty', openpty_mock), \
1342         mock.patch('os.ttyname', ttyname_mock):
1343        result = handler._get_serial_device(serial_pty, hardware_serial)
1344
1345    if popen_exception:
1346        assert result is None
1347    else:
1348        assert result[0] == expected_device
1349
1350TESTDATA_17 = [
1351    (False, False, False, False, None, False, False,
1352     TwisterStatus.NONE, None, []),
1353    (True, True, False, False, None, False, False,
1354     TwisterStatus.NONE, None, []),
1355    (True, False, True, False, None, False, False,
1356     TwisterStatus.ERROR, 'Device issue (Flash error)', []),
1357    (True, False, False, True, None, False, False,
1358     TwisterStatus.ERROR, 'Device issue (Timeout)', ['Flash operation timed out.']),
1359    (True, False, False, False, 1, False, False,
1360     TwisterStatus.ERROR, 'Device issue (Flash error?)', []),
1361    (True, False, False, False, 0, True, False,
1362     TwisterStatus.NONE, None, ['Timed out while monitoring serial output on IPName']),
1363    (True, False, False, False, 0, False, True,
1364     TwisterStatus.NONE, None, ["Terminating serial-pty:'Serial PTY'",
1365                                "Terminated serial-pty:'Serial PTY', stdout:'', stderr:''"]),
1366]
1367
1368@pytest.mark.parametrize(
1369    'has_hardware, raise_create_serial, raise_popen, raise_timeout,' \
1370    ' returncode, do_timeout_thread, use_pty,' \
1371    ' expected_status, expected_reason, expected_logs',
1372    TESTDATA_17,
1373    ids=['no hardware', 'create serial failure', 'popen called process error',
1374         'communicate timeout', 'nonzero returncode', 'valid pty', 'valid dev']
1375)
1376def test_devicehandler_handle(
1377    mocked_instance,
1378    caplog,
1379    has_hardware,
1380    raise_create_serial,
1381    raise_popen,
1382    raise_timeout,
1383    returncode,
1384    do_timeout_thread,
1385    use_pty,
1386    expected_status,
1387    expected_reason,
1388    expected_logs
1389):
1390    def mock_get_serial(serial_pty, hardware_serial):
1391        if serial_pty:
1392            serial_pty_process = mock.Mock(
1393                name='dummy serial PTY process',
1394                communicate=mock.Mock(
1395                    return_value=('', '')
1396                )
1397            )
1398            return 'dummy serial PTY device', serial_pty_process
1399        return 'dummy serial device', None
1400
1401    def mock_create_serial(*args, **kwargs):
1402        if raise_create_serial:
1403            raise SerialException('dummy cmd', 'dummy msg')
1404        return mock.Mock(name='dummy serial')
1405
1406    def mock_thread(*args, **kwargs):
1407        is_alive_mock = mock.Mock(return_value=bool(do_timeout_thread))
1408
1409        return mock.Mock(is_alive=is_alive_mock)
1410
1411    def mock_terminate(proc, *args, **kwargs):
1412        proc.communicate = mock.Mock(return_value=(mock.Mock(), mock.Mock()))
1413
1414    def mock_communicate(*args, **kwargs):
1415        if raise_timeout:
1416            raise TimeoutExpired('dummy cmd', 'dummyamount')
1417        return mock.Mock(), mock.Mock()
1418
1419    def mock_popen(command, *args, **kwargs):
1420        if raise_popen:
1421            raise CalledProcessError('dummy proc', 'dummy msg')
1422
1423        mock_process = mock.Mock(
1424            pid=1,
1425            returncode=returncode,
1426            communicate=mock.Mock(side_effect=mock_communicate)
1427        )
1428
1429        return mock.Mock(
1430            __enter__=mock.Mock(return_value=mock_process),
1431            __exit__=mock.Mock(return_value=None)
1432        )
1433
1434    hardware = None if not has_hardware else mock.Mock(
1435        baud=14400,
1436        runner='dummy runner',
1437        serial_pty='Serial PTY' if use_pty else None,
1438        serial='dummy serial',
1439        pre_script='dummy pre script',
1440        post_script='dummy post script',
1441        post_flash_script='dummy post flash script',
1442        flash_timeout=60,
1443        flash_with_test=True
1444    )
1445
1446    handler = DeviceHandler(mocked_instance, 'build', mock.Mock())
1447    handler.get_hardware = mock.Mock(return_value=hardware)
1448    handler.options = mock.Mock(
1449        timeout_multiplier=1,
1450        west_flash=None,
1451        west_runner=None
1452    )
1453    handler._get_serial_device = mock.Mock(side_effect=mock_get_serial)
1454    handler._create_command = mock.Mock(return_value=['dummy', 'command'])
1455    handler.run_custom_script = mock.Mock()
1456    handler._create_serial_connection = mock.Mock(
1457        side_effect=mock_create_serial
1458    )
1459    handler.monitor_serial = mock.Mock()
1460    handler.terminate = mock.Mock(side_effect=mock_terminate)
1461    handler._update_instance_info = mock.Mock()
1462    handler._final_handle_actions = mock.Mock()
1463    handler.make_dut_available = mock.Mock()
1464    twisterlib.handlers.terminate_process = mock.Mock()
1465    handler.instance.platform.name = 'IPName'
1466
1467    harness = mock.Mock()
1468
1469    with mock.patch('builtins.open', mock.mock_open(read_data='')), \
1470         mock.patch('subprocess.Popen', side_effect=mock_popen), \
1471         mock.patch('threading.Event', mock.Mock()), \
1472         mock.patch('threading.Thread', side_effect=mock_thread):
1473        handler.handle(harness)
1474
1475    handler.get_hardware.assert_called_once()
1476
1477    messages = [record.msg for record in caplog.records]
1478    assert all([msg in messages for msg in expected_logs])
1479
1480    if not has_hardware:
1481        return
1482
1483    handler.run_custom_script.assert_has_calls([
1484        mock.call('dummy pre script', mock.ANY)
1485    ])
1486
1487    if raise_create_serial:
1488        return
1489
1490    handler.run_custom_script.assert_has_calls([
1491        mock.call('dummy pre script', mock.ANY),
1492        mock.call('dummy post flash script', mock.ANY),
1493        mock.call('dummy post script', mock.ANY)
1494    ])
1495
1496    if expected_reason:
1497        assert handler.instance.reason == expected_reason
1498    if expected_status:
1499        assert handler.instance.status == expected_status
1500
1501    handler.make_dut_available.assert_called_once_with(hardware)
1502
1503
1504TESTDATA_18 = [
1505    (True, True, True),
1506    (False, False, False),
1507]
1508
1509@pytest.mark.parametrize(
1510    'ignore_qemu_crash, expected_ignore_crash, expected_ignore_unexpected_eof',
1511    TESTDATA_18,
1512    ids=['ignore crash', 'qemu crash']
1513)
1514def test_qemuhandler_init(
1515    mocked_instance,
1516    ignore_qemu_crash,
1517    expected_ignore_crash,
1518    expected_ignore_unexpected_eof
1519):
1520    mocked_instance.testsuite.ignore_qemu_crash = ignore_qemu_crash
1521
1522    handler = QEMUHandler(mocked_instance, 'build', mock.Mock())
1523
1524    assert handler.ignore_qemu_crash == expected_ignore_crash
1525    assert handler.ignore_unexpected_eof == expected_ignore_unexpected_eof
1526
1527
1528def test_qemuhandler_get_cpu_time():
1529    def mock_process(pid):
1530        return mock.Mock(
1531            cpu_times=mock.Mock(
1532                return_value=mock.Mock(
1533                    user=20.0,
1534                    system=64.0
1535                )
1536            )
1537        )
1538
1539    with mock.patch('psutil.Process', mock_process):
1540        res = QEMUHandler._get_cpu_time(0)
1541
1542    assert res == pytest.approx(84.0)
1543
1544
1545TESTDATA_19 = [
1546    (
1547        True,
1548        os.path.join('self', 'dummy_dir', '1'),
1549        mock.PropertyMock(return_value=os.path.join('dummy_dir', '1')),
1550        os.path.join('dummy_dir', '1')
1551    ),
1552    (
1553        False,
1554        os.path.join('self', 'dummy_dir', '2'),
1555        mock.PropertyMock(return_value=os.path.join('dummy_dir', '2')),
1556        os.path.join('self', 'dummy_dir', '2')
1557    ),
1558]
1559
1560@pytest.mark.parametrize(
1561    'self_sysbuild, self_build_dir, build_dir, expected',
1562    TESTDATA_19,
1563    ids=['domains build dir', 'self build dir']
1564)
1565def test_qemuhandler_get_default_domain_build_dir(
1566    mocked_instance,
1567    self_sysbuild,
1568    self_build_dir,
1569    build_dir,
1570    expected
1571):
1572    get_default_domain_mock = mock.Mock()
1573    type(get_default_domain_mock()).build_dir = build_dir
1574    domains_mock = mock.Mock(get_default_domain=get_default_domain_mock)
1575    from_file_mock = mock.Mock(return_value=domains_mock)
1576
1577    handler = QEMUHandler(mocked_instance, 'build', mock.Mock())
1578    handler.instance.sysbuild = self_sysbuild
1579    handler.build_dir = self_build_dir
1580
1581    with mock.patch('domains.Domains.from_file', from_file_mock):
1582        result = handler.get_default_domain_build_dir()
1583
1584    assert result == expected
1585
1586
1587TESTDATA_20 = [
1588    (
1589        os.path.join('self', 'dummy_dir', 'log1'),
1590        os.path.join('self', 'dummy_dir', 'pid1'),
1591        os.path.join('sysbuild', 'dummy_dir', 'bd1'),
1592        True
1593    ),
1594    (
1595        os.path.join('self', 'dummy_dir', 'log2'),
1596        os.path.join('self', 'dummy_dir', 'pid2'),
1597        os.path.join('sysbuild', 'dummy_dir', 'bd2'),
1598        False
1599    ),
1600]
1601
1602@pytest.mark.parametrize(
1603    'self_log, self_pid_fn, sysbuild_build_dir, exists_pid_fn',
1604    TESTDATA_20,
1605    ids=['pid exists', 'pid missing']
1606)
1607def test_qemuhandler_set_qemu_filenames(
1608    mocked_instance,
1609    self_log,
1610    self_pid_fn,
1611    sysbuild_build_dir,
1612    exists_pid_fn
1613):
1614    unlink_mock = mock.Mock()
1615    exists_mock = mock.Mock(return_value=exists_pid_fn)
1616
1617    handler = QEMUHandler(mocked_instance, 'build', mock.Mock())
1618    handler.log = self_log
1619    handler.pid_fn = self_pid_fn
1620
1621    with mock.patch('os.unlink', unlink_mock), \
1622         mock.patch('os.path.exists', exists_mock):
1623        handler._set_qemu_filenames(sysbuild_build_dir)
1624
1625    assert handler.fifo_fn == mocked_instance.build_dir + \
1626                              os.path.sep + 'qemu-fifo'
1627
1628    assert handler.pid_fn ==  sysbuild_build_dir + os.path.sep + 'qemu.pid'
1629
1630    assert handler.log_fn == self_log
1631
1632    if exists_pid_fn:
1633        unlink_mock.assert_called_once_with(sysbuild_build_dir + \
1634                                            os.path.sep + 'qemu.pid')
1635
1636
1637def test_qemuhandler_create_command(mocked_instance):
1638    sysbuild_build_dir = os.path.join('sysbuild', 'dummy_dir')
1639
1640    handler = QEMUHandler(mocked_instance, 'build', mock.Mock(), 'dummy_cmd', False)
1641
1642    result = handler._create_command(sysbuild_build_dir)
1643
1644    assert result == ['dummy_cmd', '-C', 'sysbuild' + os.path.sep + 'dummy_dir',
1645                      'run']
1646
1647
1648TESTDATA_21 = [
1649    (
1650        0,
1651        False,
1652        None,
1653        'good dummy status',
1654        False,
1655        TwisterStatus.NONE,
1656        None,
1657        False
1658    ),
1659    (
1660        1,
1661        True,
1662        None,
1663        'good dummy status',
1664        False,
1665        TwisterStatus.NONE,
1666        None,
1667        False
1668    ),
1669    (
1670        0,
1671        False,
1672        None,
1673        TwisterStatus.NONE,
1674        True,
1675        TwisterStatus.FAIL,
1676        'Timeout',
1677        True
1678    ),
1679    (
1680        1,
1681        False,
1682        None,
1683        TwisterStatus.NONE,
1684        False,
1685        TwisterStatus.FAIL,
1686        'Exited with 1',
1687        True
1688    ),
1689    (
1690        1,
1691        False,
1692        'preexisting reason',
1693        'good dummy status',
1694        False,
1695        TwisterStatus.FAIL,
1696        'preexisting reason',
1697        True
1698    ),
1699]
1700
1701@pytest.mark.parametrize(
1702    'self_returncode, self_ignore_qemu_crash,' \
1703    ' self_instance_reason, harness_status, is_timeout,' \
1704    ' expected_status, expected_reason, expected_called_missing_case',
1705    TESTDATA_21,
1706    ids=['not failed', 'qemu ignore', 'timeout', 'bad returncode', 'other fail']
1707)
1708def test_qemuhandler_update_instance_info(
1709    mocked_instance,
1710    self_returncode,
1711    self_ignore_qemu_crash,
1712    self_instance_reason,
1713    harness_status,
1714    is_timeout,
1715    expected_status,
1716    expected_reason,
1717    expected_called_missing_case
1718):
1719    mocked_instance.add_missing_case_status = mock.Mock()
1720    mocked_instance.reason = self_instance_reason
1721    mocked_harness = mock.Mock(status=harness_status, reason="foobar")
1722
1723    handler = QEMUHandler(mocked_instance, 'build', mock.Mock())
1724    handler.returncode = self_returncode
1725    handler.ignore_qemu_crash = self_ignore_qemu_crash
1726
1727    handler._update_instance_info(mocked_harness, is_timeout)
1728
1729    assert handler.instance.status == expected_status
1730    assert handler.instance.reason == expected_reason
1731
1732    if expected_called_missing_case:
1733        mocked_instance.add_missing_case_status.assert_called_once_with(
1734            TwisterStatus.BLOCK
1735        )
1736
1737
1738def test_qemuhandler_thread_get_fifo_names():
1739    fifo_fn = 'dummy'
1740
1741    fifo_in, fifo_out = QEMUHandler._thread_get_fifo_names(fifo_fn)
1742
1743    assert fifo_in ==  'dummy.in'
1744    assert fifo_out ==  'dummy.out'
1745
1746
1747TESTDATA_24 = [
1748    (TwisterStatus.FAIL, 'timeout', TwisterStatus.FAIL, 'timeout'),
1749    (TwisterStatus.FAIL, 'Execution error', TwisterStatus.FAIL, 'Execution error'),
1750    (TwisterStatus.FAIL, 'unexpected eof', TwisterStatus.FAIL, 'unexpected eof'),
1751    (TwisterStatus.FAIL, 'unexpected byte', TwisterStatus.FAIL, 'unexpected byte'),
1752    (TwisterStatus.NONE, None, TwisterStatus.NONE, 'Unknown'),
1753]
1754
1755@pytest.mark.parametrize(
1756    '_status, _reason, expected_status, expected_reason',
1757    TESTDATA_24,
1758    ids=['timeout', 'failed', 'unexpected eof', 'unexpected byte', 'unknown']
1759)
1760def test_qemuhandler_thread_update_instance_info(
1761    mocked_instance,
1762    _status,
1763    _reason,
1764    expected_status,
1765    expected_reason
1766):
1767    handler = QEMUHandler(mocked_instance, 'build', mock.Mock())
1768    handler_time = 59
1769
1770    QEMUHandler._thread_update_instance_info(handler, handler_time, _status, _reason)
1771
1772    assert handler.instance.execution_time == handler_time
1773
1774    assert handler.instance.status == expected_status
1775    assert handler.instance.reason == expected_reason
1776
1777
1778TESTDATA_25 = [
1779    (
1780        ('1\n' * 60).encode('utf-8'),
1781        60,
1782        1,
1783        [TwisterStatus.NONE] * 60 + [TwisterStatus.PASS] * 6,
1784        1000,
1785        False,
1786        TwisterStatus.FAIL,
1787        'timeout',
1788        [mock.call('1\n'), mock.call('1\n')]
1789    ),
1790    (
1791        ('1\n' * 60).encode('utf-8'),
1792        60,
1793        -1,
1794        [TwisterStatus.NONE] * 60 + [TwisterStatus.PASS] * 30,
1795        100,
1796        False,
1797        TwisterStatus.FAIL,
1798        None,
1799        [mock.call('1\n'), mock.call('1\n')]
1800    ),
1801    (
1802        b'',
1803        60,
1804        1,
1805        [TwisterStatus.PASS] * 3,
1806        100,
1807        False,
1808        TwisterStatus.FAIL,
1809        'unexpected eof',
1810        []
1811    ),
1812    (
1813        b'\x81',
1814        60,
1815        1,
1816        [TwisterStatus.PASS] * 3,
1817        100,
1818        False,
1819        TwisterStatus.FAIL,
1820        'unexpected byte',
1821        []
1822    ),
1823    (
1824        '1\n2\n3\n4\n5\n'.encode('utf-8'),
1825        600,
1826        1,
1827        [TwisterStatus.NONE] * 3 + [TwisterStatus.PASS] * 7,
1828        100,
1829        False,
1830        TwisterStatus.PASS,
1831        None,
1832        [mock.call('1\n'), mock.call('2\n'), mock.call('3\n'), mock.call('4\n')]
1833    ),
1834    (
1835        '1\n2\n3\n4\n5\n'.encode('utf-8'),
1836        600,
1837        0,
1838        [TwisterStatus.NONE] * 3 + [TwisterStatus.PASS] * 7,
1839        100,
1840        False,
1841        TwisterStatus.FAIL,
1842        'timeout',
1843        [mock.call('1\n'), mock.call('2\n')]
1844    ),
1845    (
1846        '1\n2\n3\n4\n5\n'.encode('utf-8'),
1847        60,
1848        1,
1849        [TwisterStatus.NONE] * 3 + [TwisterStatus.PASS] * 7,
1850        (n for n in [100, 100, 10000]),
1851        True,
1852        TwisterStatus.PASS,
1853        None,
1854        [mock.call('1\n'), mock.call('2\n'), mock.call('3\n'), mock.call('4\n')]
1855    ),
1856]
1857
1858@pytest.mark.parametrize(
1859    'content, timeout, pid, harness_statuses, cputime, capture_coverage,' \
1860    ' expected_status, expected_reason, expected_log_calls',
1861    TESTDATA_25,
1862    ids=[
1863        'timeout',
1864        'harness failed',
1865        'unexpected eof',
1866        'unexpected byte',
1867        'harness success',
1868        'timeout by pid=0',
1869        'capture_coverage'
1870    ]
1871)
1872def test_qemuhandler_thread(
1873    mocked_instance,
1874    faux_timer,
1875    content,
1876    timeout,
1877    pid,
1878    harness_statuses,
1879    cputime,
1880    capture_coverage,
1881    expected_status,
1882    expected_reason,
1883    expected_log_calls
1884):
1885    def mock_cputime(pid):
1886        if pid > 0:
1887            return cputime if isinstance(cputime, int) else next(cputime)
1888        else:
1889            raise ProcessLookupError()
1890
1891    type(mocked_instance.testsuite).timeout = mock.PropertyMock(return_value=timeout)
1892    handler = QEMUHandler(mocked_instance, 'build', mock.Mock(timeout_multiplier=1))
1893    handler.ignore_unexpected_eof = False
1894    handler.pid_fn = 'pid_fn'
1895    handler.fifo_fn = 'fifo_fn'
1896
1897    file_objs = {}
1898
1899    def mocked_open(filename, *args, **kwargs):
1900        if filename == handler.pid_fn:
1901            contents = str(pid).encode('utf-8')
1902        elif filename == handler.fifo_fn + '.out':
1903            contents = content
1904        else:
1905            contents = b''
1906
1907        file_object = mock.mock_open(read_data=contents).return_value
1908        file_object.__iter__.return_value = contents.splitlines(True)
1909        if file_object not in file_objs:
1910            file_objs[filename] = file_object
1911
1912        return file_object
1913
1914    harness = mock.Mock(capture_coverage=capture_coverage, handle=print)
1915    type(harness).status = mock.PropertyMock(side_effect=harness_statuses)
1916
1917    p = mock.Mock()
1918    p.poll = mock.Mock(
1919        side_effect=itertools.cycle([True, True, True, True, False])
1920    )
1921
1922    mock_thread_get_fifo_names = mock.Mock(
1923        return_value=('fifo_fn.in', 'fifo_fn.out')
1924    )
1925    mock_thread_update_instance_info = mock.Mock()
1926
1927    with mock.patch('time.time', side_effect=faux_timer.time), \
1928         mock.patch('builtins.open', new=mocked_open), \
1929         mock.patch('select.poll', return_value=p), \
1930         mock.patch('os.path.exists', return_value=True), \
1931         mock.patch('os.unlink', mock.Mock()), \
1932         mock.patch('os.mkfifo', mock.Mock()), \
1933         mock.patch('os.kill', mock.Mock()), \
1934         mock.patch('twisterlib.handlers.QEMUHandler._get_cpu_time',
1935                    mock_cputime), \
1936         mock.patch('twisterlib.handlers.QEMUHandler._thread_get_fifo_names',
1937                    mock_thread_get_fifo_names), \
1938         mock.patch('twisterlib.handlers.QEMUHandler.' \
1939                    '_thread_update_instance_info',
1940                    mock_thread_update_instance_info):
1941        QEMUHandler._thread(
1942            handler,
1943            handler.get_test_timeout(),
1944            handler.build_dir,
1945            handler.log,
1946            handler.fifo_fn,
1947            handler.pid_fn,
1948            harness,
1949            handler.ignore_unexpected_eof
1950        )
1951
1952    mock_thread_update_instance_info.assert_called_once_with(
1953        handler,
1954        mock.ANY,
1955        expected_status,
1956        mock.ANY
1957    )
1958
1959    file_objs[handler.log].write.assert_has_calls(expected_log_calls)
1960
1961
1962TESTDATA_26 = [
1963    (True, False, TwisterStatus.NONE, True,
1964     ['No timeout, return code from QEMU (1): 1',
1965      'return code from QEMU (1): 1']),
1966    (False, True, TwisterStatus.PASS, True, ['return code from QEMU (1): 0']),
1967    (False, True, TwisterStatus.FAIL, False, ['return code from QEMU (None): 1']),
1968]
1969
1970@pytest.mark.parametrize(
1971    'isatty, do_timeout, harness_status, exists_pid_fn, expected_logs',
1972    TESTDATA_26,
1973    ids=['no timeout, isatty', 'timeout passed', 'timeout, no pid_fn']
1974)
1975def test_qemuhandler_handle(
1976    mocked_instance,
1977    caplog,
1978    tmp_path,
1979    isatty,
1980    do_timeout,
1981    harness_status,
1982    exists_pid_fn,
1983    expected_logs
1984):
1985    def mock_wait(*args, **kwargs):
1986        if do_timeout:
1987            raise TimeoutExpired('dummy cmd', 'dummyamount')
1988
1989    mock_process = mock.Mock(pid=0, returncode=1)
1990    mock_process.communicate = mock.Mock(
1991        return_value=(mock.Mock(), mock.Mock())
1992    )
1993    mock_process.wait = mock.Mock(side_effect=mock_wait)
1994
1995    handler = QEMUHandler(mocked_instance, 'build', mock.Mock(timeout_multiplier=1))
1996
1997    def mock_path_exists(name, *args, **kwargs):
1998        return exists_pid_fn
1999
2000    def mock_popen(command, stdout=None, stdin=None, stderr=None, cwd=None):
2001        return mock.Mock(
2002            __enter__=mock.Mock(return_value=mock_process),
2003            __exit__=mock.Mock(return_value=None),
2004            communicate=mock.Mock(return_value=(mock.Mock(), mock.Mock()))
2005        )
2006
2007    def mock_thread(name=None, target=None, daemon=None, args=None):
2008        return mock.Mock()
2009
2010    def mock_filenames(sysbuild_build_dir):
2011        handler.fifo_fn = os.path.join('dummy', 'qemu-fifo')
2012        handler.pid_fn = os.path.join(sysbuild_build_dir, 'qemu.pid')
2013        handler.log_fn = os.path.join('dummy', 'log')
2014
2015    harness = mock.Mock(status=harness_status)
2016    handler_options_west_flash = []
2017
2018    domain_build_dir = os.path.join('sysbuild', 'dummydir')
2019    command = ['generator_cmd', '-C', os.path.join('cmd', 'path'), 'run']
2020
2021    handler.options = mock.Mock(
2022        timeout_multiplier=1,
2023        west_flash=handler_options_west_flash,
2024        west_runner=None
2025    )
2026    handler.run_custom_script = mock.Mock(return_value=None)
2027    handler.make_device_available = mock.Mock(return_value=None)
2028    handler._final_handle_actions = mock.Mock(return_value=None)
2029    handler._create_command = mock.Mock(return_value=command)
2030    handler._set_qemu_filenames = mock.Mock(side_effect=mock_filenames)
2031    handler.get_default_domain_build_dir = mock.Mock(return_value=domain_build_dir)
2032    handler.terminate = mock.Mock()
2033
2034    unlink_mock = mock.Mock()
2035
2036    with mock.patch('subprocess.Popen', side_effect=mock_popen), \
2037         mock.patch('builtins.open', mock.mock_open(read_data='1')), \
2038         mock.patch('threading.Thread', side_effect=mock_thread), \
2039         mock.patch('os.path.exists', side_effect=mock_path_exists), \
2040         mock.patch('os.unlink', unlink_mock), \
2041         mock.patch('sys.stdout.isatty', return_value=isatty):
2042        handler.handle(harness)
2043
2044    assert all([expected_log in caplog.text for expected_log in expected_logs])
2045
2046
2047def test_qemuhandler_get_fifo(mocked_instance):
2048    handler = QEMUHandler(mocked_instance, 'build', mock.Mock(timeout_multiplier=1))
2049    handler.fifo_fn = 'fifo_fn'
2050
2051    result = handler.get_fifo()
2052
2053    assert result == 'fifo_fn'
2054