1#!/usr/bin/env python3
2# Copyright (c) 2023 Intel Corporation
3#
4# SPDX-License-Identifier: Apache-2.0
5"""
6Tests for handlers.py classes' methods
7"""
8
9import itertools
10import mock
11import os
12import pytest
13import signal
14import subprocess
15import sys
16
17from contextlib import nullcontext
18from importlib import reload
19from serial import SerialException
20from subprocess import CalledProcessError, TimeoutExpired
21
22import twisterlib.harness
23
24from conftest import ZEPHYR_BASE
25from twisterlib.error import TwisterException
26from twisterlib.handlers import (
27    Handler,
28    BinaryHandler,
29    DeviceHandler,
30    QEMUHandler,
31    SimulationHandler
32)
33
34
35@pytest.fixture
36def mocked_instance(tmp_path):
37    instance = mock.Mock()
38
39    testsuite = mock.Mock()
40    type(testsuite).source_dir = mock.PropertyMock(return_value='')
41    instance.testsuite = testsuite
42
43    build_dir = tmp_path / 'build_dir'
44    os.makedirs(build_dir)
45    type(instance).build_dir = mock.PropertyMock(return_value=str(build_dir))
46
47    platform = mock.Mock()
48    type(platform).binaries = mock.PropertyMock(return_value=[])
49    instance.platform = platform
50
51    type(instance.testsuite).timeout = mock.PropertyMock(return_value=60)
52    type(instance.platform).timeout_multiplier = mock.PropertyMock(
53        return_value=2
54    )
55
56    instance.status = None
57    instance.reason = 'Unknown'
58
59    return instance
60
61
62@pytest.fixture
63def faux_timer():
64    class Counter:
65        def __init__(self):
66            self.t = 0
67
68        def time(self):
69            self.t += 1
70            return self.t
71
72    return Counter()
73
74
75TESTDATA_1 = [
76    (True, False, 'posix', ['Install pyserial python module with pip to use' \
77     ' --device-testing option.'], None),
78    (False, True, 'nt', [], None),
79    (True, True, 'posix', ['Install pyserial python module with pip to use' \
80     ' --device-testing option.'], ImportError),
81]
82
83@pytest.mark.parametrize(
84    'fail_serial, fail_pty, os_name, expected_outs, expected_error',
85    TESTDATA_1,
86    ids=['import serial', 'import pty nt', 'import serial+pty posix']
87)
88def test_imports(
89    capfd,
90    fail_serial,
91    fail_pty,
92    os_name,
93    expected_outs,
94    expected_error
95):
96    class ImportRaiser:
97        def find_spec(self, fullname, path, target=None):
98            if fullname == 'serial' and fail_serial:
99                raise ImportError()
100            if fullname == 'pty' and fail_pty:
101                raise ImportError()
102
103    modules_mock = sys.modules.copy()
104    modules_mock['serial'] = None if fail_serial else modules_mock['serial']
105    modules_mock['pty'] = None if fail_pty else modules_mock['pty']
106
107    meta_path_mock = sys.meta_path[:]
108    meta_path_mock.insert(0, ImportRaiser())
109
110    with mock.patch('os.name', os_name), \
111         mock.patch.dict('sys.modules', modules_mock, clear=True), \
112         mock.patch('sys.meta_path', meta_path_mock), \
113         pytest.raises(expected_error) if expected_error else nullcontext():
114        reload(twisterlib.handlers)
115
116    out, _ = capfd.readouterr()
117    assert all([expected_out in out for expected_out in expected_outs])
118
119
120def test_handler_final_handle_actions(mocked_instance):
121    instance = mocked_instance
122    instance.testcases = [mock.Mock()]
123
124    handler = Handler(mocked_instance)
125    handler.suite_name_check = True
126
127    harness = twisterlib.harness.Test()
128    harness.state = mock.Mock()
129    harness.detected_suite_names = mock.Mock()
130    harness.matched_run_id = False
131    harness.run_id_exists = True
132
133    handler_time = mock.Mock()
134
135    handler._final_handle_actions(harness, handler_time)
136
137    assert handler.instance.status == 'failed'
138    assert handler.instance.execution_time == handler_time
139    assert handler.instance.reason == 'RunID mismatch'
140    assert all(testcase.status == 'failed' for \
141     testcase in handler.instance.testcases)
142
143    handler.instance.reason = 'This reason shan\'t be changed.'
144    handler._final_handle_actions(harness, handler_time)
145
146    assert handler.instance.reason == 'This reason shan\'t be changed.'
147
148
149TESTDATA_2 = [
150    (['dummy_testsuite_name'], False),
151    ([], True),
152    (['another_dummy_name', 'yet_another_dummy_name'], True),
153]
154
155@pytest.mark.parametrize(
156    'detected_suite_names, should_be_called',
157    TESTDATA_2,
158    ids=['detected one expected', 'detected none', 'detected two unexpected']
159)
160def test_handler_verify_ztest_suite_name(
161    mocked_instance,
162    detected_suite_names,
163    should_be_called
164):
165    instance = mocked_instance
166    type(instance.testsuite).ztest_suite_names = ['dummy_testsuite_name']
167
168    harness_state = 'passed'
169
170    handler_time = mock.Mock()
171
172    with mock.patch.object(Handler, '_missing_suite_name') as _missing_mocked:
173        handler = Handler(instance)
174        handler._verify_ztest_suite_name(
175            harness_state,
176            detected_suite_names,
177            handler_time
178        )
179
180        if should_be_called:
181            _missing_mocked.assert_called_once()
182        else:
183            _missing_mocked.assert_not_called()
184
185
186def test_handler_missing_suite_name(mocked_instance):
187    instance = mocked_instance
188    instance.testcases = [mock.Mock()]
189
190    handler = Handler(mocked_instance)
191    handler.suite_name_check = True
192
193    expected_suite_names = ['dummy_testsuite_name']
194
195    handler_time = mock.Mock()
196
197    handler._missing_suite_name(expected_suite_names, handler_time)
198
199    assert handler.instance.status == 'failed'
200    assert handler.instance.execution_time == handler_time
201    assert handler.instance.reason == 'Testsuite mismatch'
202    assert all(
203        testcase.status == 'failed' for testcase in handler.instance.testcases
204    )
205
206
207def test_handler_record(mocked_instance):
208    instance = mocked_instance
209    instance.testcases = [mock.Mock()]
210
211    handler = Handler(instance)
212    handler.suite_name_check = True
213
214    harness = twisterlib.harness.Test()
215    harness.recording = ['dummy recording']
216    type(harness).fieldnames = mock.PropertyMock(return_value=[])
217
218    mock_writerow = mock.Mock()
219    mock_writer = mock.Mock(writerow=mock_writerow)
220
221    with mock.patch(
222        'builtins.open',
223        mock.mock_open(read_data='')
224    ) as mock_file, \
225         mock.patch(
226        'csv.writer',
227        mock.Mock(return_value=mock_writer)
228    ) as mock_writer_constructor:
229        handler.record(harness)
230
231    mock_file.assert_called_with(
232        os.path.join(instance.build_dir, 'recording.csv'),
233        'at'
234    )
235
236    mock_writer_constructor.assert_called_with(
237        mock_file(),
238        harness.fieldnames,
239        lineterminator=os.linesep
240    )
241
242    mock_writerow.assert_has_calls(
243        [mock.call(harness.fieldnames)] + \
244        [mock.call(recording) for recording in harness.recording]
245    )
246
247
248def test_handler_terminate(mocked_instance):
249    def mock_kill_function(pid, sig):
250        if pid < 0:
251            raise ProcessLookupError
252
253    instance = mocked_instance
254
255    handler = Handler(instance)
256
257    mock_process = mock.Mock()
258    mock_child1 = mock.Mock(pid=1)
259    mock_child2 = mock.Mock(pid=2)
260    mock_process.children = mock.Mock(return_value=[mock_child1, mock_child2])
261
262    mock_proc = mock.Mock(pid=0)
263    mock_proc.terminate = mock.Mock(return_value=None)
264    mock_proc.kill = mock.Mock(return_value=None)
265
266    with mock.patch('psutil.Process', return_value=mock_process), \
267         mock.patch(
268        'os.kill',
269        mock.Mock(side_effect=mock_kill_function)
270    ) as mock_kill:
271        handler.terminate(mock_proc)
272
273        assert handler.terminated
274        mock_proc.terminate.assert_called_once()
275        mock_proc.kill.assert_called_once()
276        mock_kill.assert_has_calls(
277            [mock.call(1, signal.SIGTERM), mock.call(2, signal.SIGTERM)]
278        )
279
280        mock_child_neg1 = mock.Mock(pid=-1)
281        mock_process.children = mock.Mock(
282            return_value=[mock_child_neg1, mock_child2]
283        )
284        handler.terminated = False
285        mock_kill.reset_mock()
286
287        handler.terminate(mock_proc)
288
289    mock_kill.assert_has_calls(
290        [mock.call(-1, signal.SIGTERM), mock.call(2, signal.SIGTERM)]
291    )
292
293
294def test_binaryhandler_try_kill_process_by_pid(mocked_instance):
295    def mock_kill_function(pid, sig):
296        if pid < 0:
297            raise ProcessLookupError
298
299    instance = mocked_instance
300
301    handler = BinaryHandler(instance, 'build')
302    handler.pid_fn = os.path.join('dummy', 'path', 'to', 'pid.pid')
303
304    with mock.patch(
305        'os.kill',
306        mock.Mock(side_effect=mock_kill_function)
307    ) as mock_kill, \
308         mock.patch('os.unlink', mock.Mock()) as mock_unlink:
309        with mock.patch('builtins.open', mock.mock_open(read_data='1')):
310            handler.try_kill_process_by_pid()
311
312        mock_unlink.assert_called_once_with(
313            os.path.join('dummy', 'path', 'to', 'pid.pid')
314        )
315        mock_kill.assert_called_once_with(1, signal.SIGKILL)
316
317        mock_unlink.reset_mock()
318        mock_kill.reset_mock()
319        handler.pid_fn = os.path.join('dummy', 'path', 'to', 'pid.pid')
320
321        with mock.patch('builtins.open', mock.mock_open(read_data='-1')):
322            handler.try_kill_process_by_pid()
323
324        mock_unlink.assert_called_once_with(
325            os.path.join('dummy', 'path', 'to', 'pid.pid')
326        )
327        mock_kill.assert_called_once_with(-1, signal.SIGKILL)
328
329
330TESTDATA_3 = [
331    (
332        [b'This\\r\\n', b'is\r', b'a short', b'file.'],
333        mock.Mock(state=False, capture_coverage=False),
334        [
335            mock.call('This\\r\\n'),
336            mock.call('is\r'),
337            mock.call('a short'),
338            mock.call('file.')
339        ],
340        [
341            mock.call('This'),
342            mock.call('is'),
343            mock.call('a short'),
344            mock.call('file.')
345        ],
346        None,
347        False
348    ),
349    (
350        [b'Too much.'] * 120,  # Should be more than the timeout
351        mock.Mock(state=False, capture_coverage=False),
352        None,
353        None,
354        True,
355        False
356    ),
357    (
358        [b'Too much.'] * 120,  # Should be more than the timeout
359        mock.Mock(state=True, capture_coverage=False),
360        None,
361        None,
362        True,
363        False
364    ),
365    (
366        [b'Too much.'] * 120,  # Should be more than the timeout
367        mock.Mock(state=True, capture_coverage=True),
368        None,
369        None,
370        False,
371        True
372    ),
373]
374
375@pytest.mark.parametrize(
376    'proc_stdout, harness, expected_handler_calls,'
377    ' expected_harness_calls, should_be_less, timeout_wait',
378    TESTDATA_3,
379    ids=[
380        'no timeout',
381        'timeout',
382        'timeout with harness state',
383        'timeout with capture_coverage, wait timeout'
384    ]
385)
386def test_binaryhandler_output_handler(
387    mocked_instance,
388    faux_timer,
389    proc_stdout,
390    harness,
391    expected_handler_calls,
392    expected_harness_calls,
393    should_be_less,
394    timeout_wait
395):
396    class MockStdout(mock.Mock):
397        def __init__(self, text):
398            super().__init__(text)
399            self.text = text
400            self.line_index = 0
401
402        def readline(self):
403            if self.line_index == len(self.text):
404                self.line_index = 0
405                return b''
406            else:
407                line = self.text[self.line_index]
408                self.line_index += 1
409                return line
410
411    class MockProc(mock.Mock):
412        def __init__(self, pid, stdout):
413            super().__init__(pid, stdout)
414            self.pid = mock.PropertyMock(return_value=pid)
415            self.stdout = MockStdout(stdout)
416
417        def wait(self, *args, **kwargs):
418            if timeout_wait:
419                raise TimeoutExpired('dummy cmd', 'dummyamount')
420
421    handler = BinaryHandler(mocked_instance, 'build')
422    handler.terminate = mock.Mock()
423    handler.options = mock.Mock(timeout_multiplier=1)
424
425    proc = MockProc(1, proc_stdout)
426
427    with mock.patch(
428        'builtins.open',
429        mock.mock_open(read_data='')
430    ) as mock_file, \
431         mock.patch('time.time', side_effect=faux_timer.time):
432        handler._output_handler(proc, harness)
433
434        mock_file.assert_called_with(handler.log, 'wt')
435
436    if expected_handler_calls:
437        mock_file.return_value.write.assert_has_calls(expected_handler_calls)
438    if expected_harness_calls:
439        harness.handle.assert_has_calls(expected_harness_calls)
440    if should_be_less is not None:
441        if should_be_less:
442            assert mock_file.return_value.write.call_count < len(proc_stdout)
443        else:
444            assert mock_file.return_value.write.call_count == len(proc_stdout)
445    if timeout_wait:
446        handler.terminate.assert_called_once_with(proc)
447
448
449TESTDATA_4 = [
450    (True, False, False, True, None, None,
451     ['valgrind', '--error-exitcode=2', '--leak-check=full',
452      f'--suppressions={ZEPHYR_BASE}/scripts/valgrind.supp',
453      '--log-file=build_dir/valgrind.log', '--track-origins=yes',
454      'generator', 'run_renode_test']),
455    (False, True, False, False, 123, None, ['generator', 'run', '--seed=123']),
456    (False, False, True, False, None, None,
457     ['west', 'flash', '--skip-rebuild', '-d', 'build_dir']),
458    (False, False, False, False, None, ['ex1', 'ex2'], ['bin', 'ex1', 'ex2']),
459]
460
461@pytest.mark.parametrize(
462    'robot_test, call_make_run, call_west_flash, enable_valgrind, seed,' \
463    ' extra_args, expected',
464    TESTDATA_4,
465    ids=['robot, valgrind', 'make run, seed', 'west flash', 'binary, extra']
466)
467def test_binaryhandler_create_command(
468    mocked_instance,
469    robot_test,
470    call_make_run,
471    call_west_flash,
472    enable_valgrind,
473    seed,
474    extra_args,
475    expected
476):
477    handler = BinaryHandler(mocked_instance, 'build')
478    handler.generator_cmd = 'generator'
479    handler.binary = 'bin'
480    handler.call_make_run = call_make_run
481    handler.call_west_flash = call_west_flash
482    handler.options = mock.Mock(enable_valgrind=enable_valgrind)
483    handler.seed = seed
484    handler.extra_test_args = extra_args
485    handler.build_dir = 'build_dir'
486
487    command = handler._create_command(robot_test)
488
489    assert command == expected
490
491
492TESTDATA_5 = [
493    (False, False, False),
494    (True, False, False),
495    (True, True, False),
496    (False, False, True),
497]
498
499@pytest.mark.parametrize(
500    'enable_asan, enable_lsan, enable_ubsan',
501    TESTDATA_5,
502    ids=['none', 'asan', 'asan, lsan', 'ubsan']
503)
504def test_binaryhandler_create_env(
505    mocked_instance,
506    enable_asan,
507    enable_lsan,
508    enable_ubsan
509):
510    handler = BinaryHandler(mocked_instance, 'build')
511    handler.options = mock.Mock(
512        enable_asan=enable_asan,
513        enable_lsan=enable_lsan,
514        enable_ubsan=enable_ubsan
515    )
516
517    env = {
518        'example_env_var': True,
519        'ASAN_OPTIONS': 'dummy=dummy:',
520        'UBSAN_OPTIONS': 'dummy=dummy:'
521    }
522
523    with mock.patch('os.environ', env):
524        res = handler._create_env()
525
526    assert env['example_env_var'] == res['example_env_var']
527
528    if enable_ubsan:
529        assert env['UBSAN_OPTIONS'] in res['UBSAN_OPTIONS']
530        assert 'log_path=stdout:' in res['UBSAN_OPTIONS']
531        assert 'halt_on_error=1:' in res['UBSAN_OPTIONS']
532
533    if enable_asan:
534        assert env['ASAN_OPTIONS'] in res['ASAN_OPTIONS']
535        assert 'log_path=stdout:' in res['ASAN_OPTIONS']
536
537        if not enable_lsan:
538            assert 'detect_leaks=0' in res['ASAN_OPTIONS']
539
540
541TESTDATA_6 = [
542    (None, False, 2, True, 'failed', 'Valgrind error', False),
543    (None, False, 1, False, 'failed', 'Failed', False),
544    ('failed', False, 0, False, 'failed', 'Failed', False),
545    ('success', False, 0, False, 'success', 'Unknown', False),
546    (None, True, 1, True, 'failed', 'Timeout', True),
547]
548
549@pytest.mark.parametrize(
550    'harness_state, terminated, returncode, enable_valgrind,' \
551    ' expected_status, expected_reason, do_add_missing',
552    TESTDATA_6,
553    ids=['valgrind error', 'failed', 'harness failed', 'success', 'no state']
554)
555def test_binaryhandler_update_instance_info(
556    mocked_instance,
557    harness_state,
558    terminated,
559    returncode,
560    enable_valgrind,
561    expected_status,
562    expected_reason,
563    do_add_missing
564):
565    handler = BinaryHandler(mocked_instance, 'build')
566    handler_time = 59
567    handler.terminated = terminated
568    handler.returncode = returncode
569    handler.options = mock.Mock(enable_valgrind=enable_valgrind)
570    missing_mock = mock.Mock()
571    handler.instance.add_missing_case_status = missing_mock
572
573    handler._update_instance_info(harness_state, handler_time)
574
575    assert handler.instance.execution_time == handler_time
576
577    assert handler.instance.status == expected_status
578    assert handler.instance.reason == expected_reason
579
580    if do_add_missing:
581        missing_mock.assert_called_once_with('blocked', expected_reason)
582
583
584TESTDATA_7 = [
585    (True, False, False),
586    (False, True, False),
587    (False, False, True),
588]
589
590@pytest.mark.parametrize(
591    'is_robot_test, coverage, isatty',
592    TESTDATA_7,
593    ids=['robot test', 'coverage', 'isatty']
594)
595def test_binaryhandler_handle(
596    mocked_instance,
597    caplog,
598    is_robot_test,
599    coverage,
600    isatty
601):
602    thread_mock_obj = mock.Mock()
603
604    def mock_popen(command, *args, **kwargs,):
605        return mock.Mock(
606          __enter__=mock.Mock(return_value=mock.Mock(pid=0, returncode=0)),
607          __exit__=mock.Mock(return_value=None)
608        )
609
610    def mock_thread(target, *args, **kwargs):
611        return thread_mock_obj
612
613    handler = BinaryHandler(mocked_instance, 'build')
614    handler.sourcedir = 'source_dir'
615    handler.build_dir = 'build_dir'
616    handler.name= 'Dummy Name'
617    handler._create_command = mock.Mock(return_value=['dummy' , 'command'])
618    handler._create_env = mock.Mock(return_value=[])
619    handler._update_instance_info = mock.Mock()
620    handler._final_handle_actions = mock.Mock()
621    handler.terminate = mock.Mock()
622    handler.try_kill_process_by_pid = mock.Mock()
623    handler.options = mock.Mock(coverage=coverage)
624
625    robot_mock = mock.Mock()
626    harness = mock.Mock(is_robot_test=is_robot_test, run_robot_test=robot_mock)
627
628    popen_mock = mock.Mock(side_effect=mock_popen)
629    thread_mock = mock.Mock(side_effect=mock_thread)
630    call_mock = mock.Mock()
631
632    with mock.patch('subprocess.call', call_mock), \
633         mock.patch('subprocess.Popen', popen_mock), \
634         mock.patch('threading.Thread', thread_mock), \
635         mock.patch('sys.stdout.isatty', return_value=isatty):
636        handler.handle(harness)
637
638    if is_robot_test:
639        robot_mock.assert_called_once_with(['dummy', 'command'], mock.ANY)
640        return
641
642    assert 'Spawning BinaryHandler Thread for Dummy Name' in caplog.text
643
644    thread_mock_obj.join.assert_called()
645    handler._update_instance_info.assert_called_once()
646    handler._final_handle_actions.assert_called_once()
647
648    if coverage:
649        call_mock.assert_any_call(
650            ['GCOV_PREFIX=build_dir', 'gcov', 'source_dir',
651             '-b', '-s', 'build_dir'],
652            shell=True
653        )
654
655    if isatty:
656        call_mock.assert_any_call(['stty', 'sane'], stdin=mock.ANY)
657
658
659TESTDATA_8 = [
660    ('renode', True, True, False, False),
661    ('native', False, False, False, True),
662    ('build', False, True, False, False),
663]
664
665@pytest.mark.parametrize(
666    'type_str, is_pid_fn, expected_call_make_run, is_binary, expected_ready',
667    TESTDATA_8,
668    ids=[t[0] for t in TESTDATA_8]
669)
670def test_simulationhandler_init(
671    mocked_instance,
672    type_str,
673    is_pid_fn,
674    expected_call_make_run,
675    is_binary,
676    expected_ready
677):
678    handler = SimulationHandler(mocked_instance, type_str)
679
680    assert handler.call_make_run == expected_call_make_run
681    assert handler.ready == expected_ready
682
683    if is_pid_fn:
684        assert handler.pid_fn == os.path.join(mocked_instance.build_dir,
685                                              'renode.pid')
686    if is_binary:
687        assert handler.pid_fn == os.path.join(mocked_instance.build_dir,
688                                              'zephyr', 'zephyr.exe')
689
690
691TESTDATA_9 = [
692    (3, 2, 0, 0, 3, -1, True, False, False, 1),
693    (4, 1, 0, 0, -1, -1, False, True, False, 0),
694    (5, 0, 1, 2, -1, 4, False, False, True, 3)
695]
696
697@pytest.mark.parametrize(
698    'success_count, in_waiting_count, oserror_count, readline_error_count,'
699    ' haltless_count, stateless_count, end_by_halt, end_by_close,'
700    ' end_by_state, expected_line_count',
701    TESTDATA_9,
702    ids=[
703      'halt event',
704      'serial closes',
705      'harness state with errors'
706    ]
707)
708def test_devicehandler_monitor_serial(
709    mocked_instance,
710    success_count,
711    in_waiting_count,
712    oserror_count,
713    readline_error_count,
714    haltless_count,
715    stateless_count,
716    end_by_halt,
717    end_by_close,
718    end_by_state,
719    expected_line_count
720):
721    is_open_iter = iter(lambda: True, False)
722    line_iter = [
723        TypeError('dummy TypeError') if x % 2 else \
724        SerialException('dummy SerialException') for x in range(
725            readline_error_count
726        )
727    ] + [
728        f'line no {idx}'.encode('utf-8') for idx in range(success_count)
729    ]
730    in_waiting_iter = [False] * in_waiting_count + [
731        TypeError('dummy TypeError')
732    ] if end_by_close else (
733        [OSError('dummy OSError')] * oserror_count + [False] * in_waiting_count
734    ) + [True] * (success_count + readline_error_count)
735
736    is_set_iter = [False] * haltless_count + [True] \
737        if end_by_halt else iter(lambda: False, True)
738
739    state_iter = [False] * stateless_count + [True] \
740        if end_by_state else iter(lambda: False, True)
741
742    halt_event = mock.Mock(is_set=mock.Mock(side_effect=is_set_iter))
743    ser = mock.Mock(
744        isOpen=mock.Mock(side_effect=is_open_iter),
745        readline=mock.Mock(side_effect=line_iter)
746    )
747    type(ser).in_waiting = mock.PropertyMock(
748        side_effect=in_waiting_iter,
749        return_value=False
750    )
751    harness = mock.Mock(capture_coverage=False)
752    type(harness).state=mock.PropertyMock(side_effect=state_iter)
753
754    handler = DeviceHandler(mocked_instance, 'build')
755    handler.options = mock.Mock(coverage=not end_by_state)
756
757    with mock.patch('builtins.open', mock.mock_open(read_data='')):
758        handler.monitor_serial(ser, halt_event, harness)
759
760    if not end_by_close:
761        ser.close.assert_called_once()
762
763    harness.handle.assert_has_calls(
764        [mock.call(f'line no {idx}') for idx in range(expected_line_count)]
765    )
766
767
768TESTDATA_10 = [
769    (
770        'dummy_platform',
771        'dummy fixture',
772        [
773            mock.Mock(
774                fixtures=[],
775                platform='dummy_platform',
776                available=1,
777                counter=0
778            ),
779            mock.Mock(
780                fixtures=['dummy fixture'],
781                platform='another_platform',
782                available=1,
783                counter=0
784            ),
785            mock.Mock(
786                fixtures=['dummy fixture'],
787                platform='dummy_platform',
788                serial_pty=None,
789                serial=None,
790                available=1,
791                counter=0
792            ),
793            mock.Mock(
794                fixtures=['dummy fixture'],
795                platform='dummy_platform',
796                serial_pty=mock.Mock(),
797                available=1,
798                counter=0
799            )
800        ],
801        3
802    ),
803    (
804        'dummy_platform',
805        'dummy fixture',
806        [],
807        TwisterException
808    ),
809    (
810        'dummy_platform',
811        'dummy fixture',
812        [
813            mock.Mock(
814                fixtures=['dummy fixture'],
815                platform='dummy_platform',
816                serial_pty=mock.Mock(),
817                available=0
818            ),
819            mock.Mock(
820                fixtures=['another fixture'],
821                platform='dummy_platform',
822                serial_pty=mock.Mock(),
823                available=0
824            ),
825            mock.Mock(
826                fixtures=['dummy fixture'],
827                platform='dummy_platform',
828                serial=mock.Mock(),
829                available=0
830            ),
831            mock.Mock(
832                fixtures=['another fixture'],
833                platform='dummy_platform',
834                serial=mock.Mock(),
835                available=0
836            )
837        ],
838        None
839    )
840]
841
842@pytest.mark.parametrize(
843    'platform_name, fixture, duts, expected',
844    TESTDATA_10,
845    ids=['one good dut', 'exception - no duts', 'no available duts']
846)
847def test_devicehandler_device_is_available(
848    mocked_instance,
849    platform_name,
850    fixture,
851    duts,
852    expected
853):
854    mocked_instance.platform.name = platform_name
855    mocked_instance.testsuite.harness_config = {'fixture': fixture}
856
857    handler = DeviceHandler(mocked_instance, 'build')
858    handler.duts = duts
859
860    if isinstance(expected, int):
861        device = handler.device_is_available(mocked_instance)
862
863        assert device == duts[expected]
864        assert device.available == 0
865        assert device.counter == 1
866    elif expected is None:
867        device = handler.device_is_available(mocked_instance)
868
869        assert device is None
870    elif isinstance(expected, type):
871        with pytest.raises(expected):
872            device = handler.device_is_available(mocked_instance)
873    else:
874        assert False
875
876
877def test_devicehandler_make_device_available(mocked_instance):
878    serial = mock.Mock(name='dummy_serial')
879    duts = [
880        mock.Mock(available=0, serial=serial, serial_pty=None),
881        mock.Mock(available=0, serial=None, serial_pty=serial),
882        mock.Mock(
883            available=0,
884            serial=mock.Mock('another_serial'),
885            serial_pty=None
886        )
887    ]
888
889    handler = DeviceHandler(mocked_instance, 'build')
890    handler.duts = duts
891
892    handler.make_device_available(serial)
893
894    assert len([None for d in handler.duts if d.available == 1]) == 2
895    assert handler.duts[2].available == 0
896
897
898TESTDATA_11 = [
899    (mock.Mock(pid=0, returncode=0), False),
900    (mock.Mock(pid=0, returncode=1), False),
901    (mock.Mock(pid=0, returncode=1), True)
902]
903
904@pytest.mark.parametrize(
905    'mock_process, raise_timeout',
906    TESTDATA_11,
907    ids=['proper script', 'error', 'timeout']
908)
909def test_devicehandler_run_custom_script(caplog, mock_process, raise_timeout):
910    def raise_timeout_fn(timeout=-1):
911        if raise_timeout and timeout != -1:
912            raise subprocess.TimeoutExpired(None, timeout)
913        else:
914            return mock.Mock(), mock.Mock()
915
916    def assert_popen(command, *args, **kwargs):
917        return mock.Mock(
918            __enter__=mock.Mock(return_value=mock_process),
919            __exit__=mock.Mock(return_value=None)
920        )
921
922    mock_process.communicate = mock.Mock(side_effect=raise_timeout_fn)
923
924    script = [os.path.join('test','script', 'path'), 'arg']
925    timeout = 60
926
927    with mock.patch('subprocess.Popen', side_effect=assert_popen):
928        DeviceHandler.run_custom_script(script, timeout)
929
930    if raise_timeout:
931        assert all(
932            t in caplog.text.lower() for t in [str(script), 'timed out']
933        )
934        mock_process.assert_has_calls(
935            [
936                mock.call.communicate(timeout=timeout),
937                mock.call.kill(),
938                mock.call.communicate()
939            ]
940        )
941    elif mock_process.returncode == 0:
942        assert not any([r.levelname == 'ERROR' for r in caplog.records])
943    else:
944        assert 'timed out' not in caplog.text.lower()
945        assert 'custom script failure' in caplog.text.lower()
946
947
948TESTDATA_12 = [
949    (0, False),
950    (4, False),
951    (0, True)
952]
953
954@pytest.mark.parametrize(
955    'num_of_failures, raise_exception',
956    TESTDATA_12,
957    ids=['no failures', 'with failures', 'exception']
958)
959def test_devicehandler_get_hardware(
960    mocked_instance,
961    caplog,
962    num_of_failures,
963    raise_exception
964):
965    expected_hardware = mock.Mock()
966
967    def mock_availability(handler, instance, no=num_of_failures):
968        if raise_exception:
969            raise TwisterException(f'dummy message')
970        if handler.no:
971            handler.no -= 1
972            return None
973        return expected_hardware
974
975    handler = DeviceHandler(mocked_instance, 'build')
976    handler.no = num_of_failures
977
978    with mock.patch.object(
979        DeviceHandler,
980        'device_is_available',
981        mock_availability
982    ):
983        hardware = handler.get_hardware()
984
985    if raise_exception:
986        assert 'dummy message' in caplog.text.lower()
987        assert mocked_instance.status == 'failed'
988        assert mocked_instance.reason == 'dummy message'
989    else:
990        assert hardware == expected_hardware
991
992
993TESTDATA_13 = [
994    (
995        None,
996        None,
997        None,
998        ['generator_cmd', '-C', '$build_dir', 'flash']
999    ),
1000    (
1001        [],
1002        None,
1003        None,
1004        ['west', 'flash', '--skip-rebuild', '-d', '$build_dir']
1005    ),
1006    (
1007        '--dummy',
1008        None,
1009        None,
1010        ['west', 'flash', '--skip-rebuild', '-d', '$build_dir',
1011         '--', '--dummy']
1012    ),
1013    (
1014        '--dummy1,--dummy2',
1015        None,
1016        None,
1017        ['west', 'flash', '--skip-rebuild', '-d', '$build_dir',
1018         '--', '--dummy1', '--dummy2']
1019    ),
1020
1021    (
1022        None,
1023        'runner',
1024        'product',
1025        ['west', 'flash', '--skip-rebuild', '-d', '$build_dir',
1026         '--runner', 'runner', 'param1', 'param2']
1027    ),
1028
1029    (
1030        None,
1031        'pyocd',
1032        'product',
1033        ['west', 'flash', '--skip-rebuild', '-d', '$build_dir',
1034         '--runner', 'pyocd', 'param1', 'param2', '--', '--dev-id', 12345]
1035    ),
1036    (
1037        None,
1038        'nrfjprog',
1039        'product',
1040        ['west', 'flash', '--skip-rebuild', '-d', '$build_dir',
1041         '--runner', 'nrfjprog', 'param1', 'param2', '--', '--dev-id', 12345]
1042    ),
1043    (
1044        None,
1045        'openocd',
1046        'STM32 STLink',
1047        ['west', 'flash', '--skip-rebuild', '-d', '$build_dir',
1048         '--runner', 'openocd', 'param1', 'param2',
1049         '--', '--cmd-pre-init', 'hla_serial 12345']
1050    ),
1051    (
1052        None,
1053        'openocd',
1054        'STLINK-V3',
1055        ['west', 'flash', '--skip-rebuild', '-d', '$build_dir',
1056         '--runner', 'openocd', 'param1', 'param2',
1057         '--', '--cmd-pre-init', 'hla_serial 12345']
1058    ),
1059    (
1060        None,
1061        'openocd',
1062        'EDBG CMSIS-DAP',
1063        ['west', 'flash', '--skip-rebuild', '-d', '$build_dir',
1064         '--runner', 'openocd', 'param1', 'param2',
1065         '--', '--cmd-pre-init', 'cmsis_dap_serial 12345']
1066    ),
1067    (
1068        None,
1069        'jlink',
1070        'product',
1071        ['west', 'flash', '--skip-rebuild', '-d', '$build_dir',
1072         '--runner', 'jlink', '--tool-opt=-SelectEmuBySN  12345',  # 2x space
1073         'param1', 'param2']
1074    ),
1075    (
1076        None,
1077        'stm32cubeprogrammer',
1078        'product',
1079        ['west', 'flash', '--skip-rebuild', '-d', '$build_dir',
1080         '--runner', 'stm32cubeprogrammer', '--tool-opt=sn=12345',
1081         'param1', 'param2']
1082    ),
1083
1084]
1085
1086TESTDATA_13_2 = [(True), (False)]
1087
1088@pytest.mark.parametrize(
1089    'self_west_flash, runner,' \
1090    ' hardware_product_name, expected',
1091    TESTDATA_13,
1092    ids=['generator', '--west-flash', 'one west flash value',
1093         'multiple west flash values', 'generic runner', 'pyocd',
1094         'nrfjprog', 'openocd, STM32 STLink', 'openocd, STLINK-v3',
1095         'openocd, EDBG CMSIS-DAP', 'jlink', 'stm32cubeprogrammer']
1096)
1097@pytest.mark.parametrize('hardware_probe', TESTDATA_13_2, ids=['probe', 'id'])
1098def test_devicehandler_create_command(
1099    mocked_instance,
1100    self_west_flash,
1101    runner,
1102    hardware_probe,
1103    hardware_product_name,
1104    expected
1105):
1106    handler = DeviceHandler(mocked_instance, 'build')
1107    handler.options = mock.Mock(west_flash=self_west_flash)
1108    handler.generator_cmd = 'generator_cmd'
1109
1110    expected = [handler.build_dir if val == '$build_dir' else \
1111                val for val in expected]
1112
1113    hardware = mock.Mock(
1114        product=hardware_product_name,
1115        probe_id=12345 if hardware_probe else None,
1116        id=12345 if not hardware_probe else None,
1117        runner_params=['param1', 'param2']
1118    )
1119
1120    command = handler._create_command(runner, hardware)
1121
1122    assert command == expected
1123
1124
1125TESTDATA_14 = [
1126    ('success', False, 'success', 'Unknown', False),
1127    ('failed', False, 'failed', 'Failed', True),
1128    ('error', False, 'error', 'Unknown', True),
1129    (None, True, None, 'Unknown', False),
1130    (None, False, 'failed', 'Timeout', True),
1131]
1132
1133@pytest.mark.parametrize(
1134    'harness_state, flash_error,' \
1135    ' expected_status, expected_reason, do_add_missing',
1136    TESTDATA_14,
1137    ids=['success', 'failed', 'error', 'flash error', 'no status']
1138)
1139def test_devicehandler_update_instance_info(
1140    mocked_instance,
1141    harness_state,
1142    flash_error,
1143    expected_status,
1144    expected_reason,
1145    do_add_missing
1146):
1147    handler = DeviceHandler(mocked_instance, 'build')
1148    handler_time = 59
1149    missing_mock = mock.Mock()
1150    handler.instance.add_missing_case_status = missing_mock
1151
1152    handler._update_instance_info(harness_state, handler_time, flash_error)
1153
1154    assert handler.instance.execution_time == handler_time
1155
1156    assert handler.instance.status == expected_status
1157    assert handler.instance.reason == expected_reason
1158
1159    if do_add_missing:
1160        missing_mock.assert_called_once_with('blocked', expected_reason)
1161
1162
1163TESTDATA_15 = [
1164    ('dummy device', 'dummy pty', None, None, True, False, False),
1165    (
1166        'dummy device',
1167        'dummy pty',
1168        mock.Mock(communicate=mock.Mock(return_value=('', ''))),
1169        SerialException,
1170        False,
1171        True,
1172        'dummy pty'
1173    ),
1174    (
1175        'dummy device',
1176        None,
1177        None,
1178        SerialException,
1179        False,
1180        False,
1181        'dummy device'
1182    )
1183]
1184
1185@pytest.mark.parametrize(
1186    'serial_device, serial_pty, ser_pty_process, expected_exception,' \
1187    ' expected_result, terminate_ser_pty_process, make_available',
1188    TESTDATA_15,
1189    ids=['valid', 'serial pty process', 'no serial pty']
1190)
1191def test_devicehandler_create_serial_connection(
1192    mocked_instance,
1193    serial_device,
1194    serial_pty,
1195    ser_pty_process,
1196    expected_exception,
1197    expected_result,
1198    terminate_ser_pty_process,
1199    make_available
1200):
1201    def mock_serial(*args, **kwargs):
1202        if expected_exception:
1203            raise expected_exception('')
1204        return expected_result
1205
1206    handler = DeviceHandler(mocked_instance, 'build')
1207    handler.make_device_available = mock.Mock()
1208    missing_mock = mock.Mock()
1209    handler.instance.add_missing_case_status = missing_mock
1210    available_mock = mock.Mock()
1211    handler.make_device_available = available_mock
1212    handler.options = mock.Mock(timeout_multiplier=1)
1213
1214    hardware_baud = 14400
1215    flash_timeout = 60
1216    serial_mock = mock.Mock(side_effect=mock_serial)
1217
1218    with mock.patch('serial.Serial', serial_mock), \
1219         pytest.raises(expected_exception) if expected_exception else \
1220         nullcontext():
1221        result = handler._create_serial_connection(serial_device, hardware_baud,
1222                                                   flash_timeout, serial_pty,
1223                                                   ser_pty_process)
1224
1225    if expected_result:
1226        assert result is not None
1227
1228    if expected_exception:
1229        assert handler.instance.status == 'failed'
1230        assert handler.instance.reason == 'Serial Device Error'
1231
1232        missing_mock.assert_called_once_with('blocked', 'Serial Device Error')
1233
1234    if terminate_ser_pty_process:
1235        ser_pty_process.terminate.assert_called_once()
1236        ser_pty_process.communicate.assert_called_once()
1237
1238    if make_available:
1239        available_mock.assert_called_once_with(make_available)
1240
1241
1242TESTDATA_16 = [
1243    ('dummy1 dummy2', None, 'slave name'),
1244    ('dummy1,dummy2', CalledProcessError, None),
1245    (None, None, 'dummy hardware serial'),
1246]
1247
1248@pytest.mark.parametrize(
1249    'serial_pty, popen_exception, expected_device',
1250    TESTDATA_16,
1251    ids=['pty', 'pty process error', 'no pty']
1252)
1253def test_devicehandler_get_serial_device(
1254    mocked_instance,
1255    serial_pty,
1256    popen_exception,
1257    expected_device
1258):
1259    def mock_popen(command, *args, **kwargs):
1260        assert command == ['dummy1', 'dummy2']
1261        if popen_exception:
1262            raise popen_exception(command, 'Dummy error')
1263        return mock.Mock()
1264
1265    handler = DeviceHandler(mocked_instance, 'build')
1266    hardware_serial = 'dummy hardware serial'
1267
1268    popen_mock = mock.Mock(side_effect=mock_popen)
1269    openpty_mock = mock.Mock(return_value=('master', 'slave'))
1270    ttyname_mock = mock.Mock(side_effect=lambda x: x + ' name')
1271
1272    with mock.patch('subprocess.Popen', popen_mock), \
1273         mock.patch('pty.openpty', openpty_mock), \
1274         mock.patch('os.ttyname', ttyname_mock):
1275        result = handler._get_serial_device(serial_pty, hardware_serial)
1276
1277    if popen_exception:
1278        assert result is None
1279    else:
1280        assert result[0] == expected_device
1281
1282TESTDATA_17 = [
1283    (False, False, False, False, None, False, False,
1284     None, None, []),
1285    (True, True, False, False, None, False, False,
1286     None, None, []),
1287    (True, False, True, False, None, False, False,
1288     'error', 'Device issue (Flash error)', []),
1289    (True, False, False, True, None, False, False,
1290     'error', 'Device issue (Timeout)', ['Flash operation timed out.']),
1291    (True, False, False, False, 1, False, False,
1292     'error', 'Device issue (Flash error?)', []),
1293    (True, False, False, False, 0, True, False,
1294     None, None, ['Timed out while monitoring serial output on IPName']),
1295    (True, False, False, False, 0, False, True,
1296     None, None, ['Process Serial PTY terminated outs:  errs ']),
1297]
1298
1299@pytest.mark.parametrize(
1300    'has_hardware, raise_create_serial, raise_popen, raise_timeout,' \
1301    ' returncode, do_timeout_thread, use_pty,' \
1302    ' expected_status, expected_reason, expected_logs',
1303    TESTDATA_17,
1304    ids=['no hardware', 'create serial failure', 'popen called process error',
1305         'communicate timeout', 'nonzero returncode', 'valid pty', 'valid dev']
1306)
1307def test_devicehandler_handle(
1308    mocked_instance,
1309    caplog,
1310    has_hardware,
1311    raise_create_serial,
1312    raise_popen,
1313    raise_timeout,
1314    returncode,
1315    do_timeout_thread,
1316    use_pty,
1317    expected_status,
1318    expected_reason,
1319    expected_logs
1320):
1321    def mock_get_serial(serial_pty, hardware_serial):
1322        if serial_pty:
1323            serial_pty_process = mock.Mock(
1324                name='dummy serial PTY process',
1325                communicate=mock.Mock(
1326                    return_value=('', '')
1327                )
1328            )
1329            return 'dummy serial PTY device', serial_pty_process
1330        return 'dummy serial device', None
1331
1332    def mock_create_serial(*args, **kwargs):
1333        if raise_create_serial:
1334            raise SerialException('dummy cmd', 'dummy msg')
1335        return mock.Mock(name='dummy serial')
1336
1337    def mock_thread(*args, **kwargs):
1338        is_alive_mock = mock.Mock(return_value=bool(do_timeout_thread))
1339
1340        return mock.Mock(is_alive=is_alive_mock)
1341
1342    def mock_terminate(proc, *args, **kwargs):
1343        proc.communicate = mock.Mock(return_value=(mock.Mock(), mock.Mock()))
1344
1345    def mock_communicate(*args, **kwargs):
1346        if raise_timeout:
1347            raise TimeoutExpired('dummy cmd', 'dummyamount')
1348        return mock.Mock(), mock.Mock()
1349
1350    def mock_popen(command, *args, **kwargs):
1351        if raise_popen:
1352            raise CalledProcessError('dummy proc', 'dummy msg')
1353
1354        mock_process = mock.Mock(
1355            pid=1,
1356            returncode=returncode,
1357            communicate=mock.Mock(side_effect=mock_communicate)
1358        )
1359
1360        return mock.Mock(
1361            __enter__=mock.Mock(return_value=mock_process),
1362            __exit__=mock.Mock(return_value=None)
1363        )
1364
1365    hardware = None if not has_hardware else mock.Mock(
1366        baud=14400,
1367        runner='dummy runner',
1368        serial_pty='Serial PTY' if use_pty else None,
1369        serial='dummy serial',
1370        pre_script='dummy pre script',
1371        post_script='dummy post script',
1372        post_flash_script='dummy post flash script',
1373        flash_timeout=60,
1374        flash_with_test=True
1375    )
1376
1377    handler = DeviceHandler(mocked_instance, 'build')
1378    handler.get_hardware = mock.Mock(return_value=hardware)
1379    handler.options = mock.Mock(
1380        timeout_multiplier=1,
1381        west_flash=None,
1382        west_runner=None
1383    )
1384    handler._get_serial_device = mock.Mock(side_effect=mock_get_serial)
1385    handler._create_command = mock.Mock(return_value=['dummy', 'command'])
1386    handler.run_custom_script = mock.Mock()
1387    handler._create_serial_connection = mock.Mock(
1388        side_effect=mock_create_serial
1389    )
1390    handler.monitor_serial = mock.Mock()
1391    handler.terminate = mock.Mock(side_effect=mock_terminate)
1392    handler._update_instance_info = mock.Mock()
1393    handler._final_handle_actions = mock.Mock()
1394    handler.make_device_available = mock.Mock()
1395    handler.instance.platform.name = 'IPName'
1396
1397    harness = mock.Mock()
1398
1399    with mock.patch('builtins.open', mock.mock_open(read_data='')), \
1400         mock.patch('subprocess.Popen', side_effect=mock_popen), \
1401         mock.patch('threading.Event', mock.Mock()), \
1402         mock.patch('threading.Thread', side_effect=mock_thread):
1403        handler.handle(harness)
1404
1405    handler.get_hardware.assert_called_once()
1406
1407    messages = [record.msg for record in caplog.records]
1408    assert all([msg in messages for msg in expected_logs])
1409
1410    if not has_hardware:
1411        return
1412
1413    handler.run_custom_script.assert_has_calls([
1414        mock.call('dummy pre script', mock.ANY)
1415    ])
1416
1417    if raise_create_serial:
1418        return
1419
1420    handler.run_custom_script.assert_has_calls([
1421        mock.call('dummy pre script', mock.ANY),
1422        mock.call('dummy post flash script', mock.ANY),
1423        mock.call('dummy post script', mock.ANY)
1424    ])
1425
1426    if expected_reason:
1427        assert handler.instance.reason == expected_reason
1428    if expected_status:
1429        assert handler.instance.status == expected_status
1430
1431    handler.make_device_available.assert_called_once_with(
1432        'Serial PTY' if use_pty else 'dummy serial device'
1433    )
1434
1435
1436TESTDATA_18 = [
1437    (True, True, True),
1438    (False, False, False),
1439]
1440
1441@pytest.mark.parametrize(
1442    'ignore_qemu_crash, expected_ignore_crash, expected_ignore_unexpected_eof',
1443    TESTDATA_18,
1444    ids=['ignore crash', 'qemu crash']
1445)
1446def test_qemuhandler_init(
1447    mocked_instance,
1448    ignore_qemu_crash,
1449    expected_ignore_crash,
1450    expected_ignore_unexpected_eof
1451):
1452    mocked_instance.testsuite.ignore_qemu_crash = ignore_qemu_crash
1453
1454    handler = QEMUHandler(mocked_instance, 'build')
1455
1456    assert handler.ignore_qemu_crash == expected_ignore_crash
1457    assert handler.ignore_unexpected_eof == expected_ignore_unexpected_eof
1458
1459
1460def test_qemuhandler_get_cpu_time():
1461    def mock_process(pid):
1462        return mock.Mock(
1463            cpu_times=mock.Mock(
1464                return_value=mock.Mock(
1465                    user=20.0,
1466                    system=64.0
1467                )
1468            )
1469        )
1470
1471    with mock.patch('psutil.Process', mock_process):
1472        res = QEMUHandler._get_cpu_time(0)
1473
1474    assert res == pytest.approx(84.0)
1475
1476
1477TESTDATA_19 = [
1478    (
1479        True,
1480        os.path.join('self', 'dummy_dir', '1'),
1481        mock.PropertyMock(return_value=os.path.join('dummy_dir', '1')),
1482        os.path.join('dummy_dir', '1')
1483    ),
1484    (
1485        False,
1486        os.path.join('self', 'dummy_dir', '2'),
1487        mock.PropertyMock(return_value=os.path.join('dummy_dir', '2')),
1488        os.path.join('self', 'dummy_dir', '2')
1489    ),
1490]
1491
1492@pytest.mark.parametrize(
1493    'self_sysbuild, self_build_dir, build_dir, expected',
1494    TESTDATA_19,
1495    ids=['domains build dir', 'self build dir']
1496)
1497def test_qemuhandler_get_sysbuild_build_dir(
1498    mocked_instance,
1499    self_sysbuild,
1500    self_build_dir,
1501    build_dir,
1502    expected
1503):
1504    get_default_domain_mock = mock.Mock()
1505    type(get_default_domain_mock()).build_dir = build_dir
1506    domains_mock = mock.Mock(get_default_domain=get_default_domain_mock)
1507    from_file_mock = mock.Mock(return_value=domains_mock)
1508
1509    handler = QEMUHandler(mocked_instance, 'build')
1510    handler.instance.testsuite.sysbuild = self_sysbuild
1511    handler.build_dir = self_build_dir
1512
1513    with mock.patch('domains.Domains.from_file', from_file_mock):
1514        result = handler._get_sysbuild_build_dir()
1515
1516    assert result == expected
1517
1518
1519TESTDATA_20 = [
1520    (
1521        os.path.join('self', 'dummy_dir', 'log1'),
1522        os.path.join('self', 'dummy_dir', 'pid1'),
1523        os.path.join('sysbuild', 'dummy_dir', 'bd1'),
1524        True
1525    ),
1526    (
1527        os.path.join('self', 'dummy_dir', 'log2'),
1528        os.path.join('self', 'dummy_dir', 'pid2'),
1529        os.path.join('sysbuild', 'dummy_dir', 'bd2'),
1530        False
1531    ),
1532]
1533
1534@pytest.mark.parametrize(
1535    'self_log, self_pid_fn, sysbuild_build_dir, exists_pid_fn',
1536    TESTDATA_20,
1537    ids=['pid exists', 'pid missing']
1538)
1539def test_qemuhandler_set_qemu_filenames(
1540    mocked_instance,
1541    self_log,
1542    self_pid_fn,
1543    sysbuild_build_dir,
1544    exists_pid_fn
1545):
1546    unlink_mock = mock.Mock()
1547    exists_mock = mock.Mock(return_value=exists_pid_fn)
1548
1549    handler = QEMUHandler(mocked_instance, 'build')
1550    handler.log = self_log
1551    handler.pid_fn = self_pid_fn
1552
1553    with mock.patch('os.unlink', unlink_mock), \
1554         mock.patch('os.path.exists', exists_mock):
1555        handler._set_qemu_filenames(sysbuild_build_dir)
1556
1557    assert handler.fifo_fn == mocked_instance.build_dir + \
1558                              os.path.sep + 'qemu-fifo'
1559
1560    assert handler.pid_fn ==  sysbuild_build_dir + os.path.sep + 'qemu.pid'
1561
1562    assert handler.log_fn == self_log
1563
1564    if exists_pid_fn:
1565        unlink_mock.assert_called_once_with(sysbuild_build_dir + \
1566                                            os.path.sep + 'qemu.pid')
1567
1568
1569def test_qemuhandler_create_command(mocked_instance):
1570    sysbuild_build_dir = os.path.join('sysbuild', 'dummy_dir')
1571
1572    handler = QEMUHandler(mocked_instance, 'build')
1573    handler.generator_cmd = 'dummy_cmd'
1574
1575    result = handler._create_command(sysbuild_build_dir)
1576
1577    assert result == ['dummy_cmd', '-C', 'sysbuild' + os.path.sep + 'dummy_dir',
1578                      'run']
1579
1580
1581TESTDATA_21 = [
1582    (
1583        0,
1584        False,
1585        None,
1586        'good dummy state',
1587        False,
1588        None,
1589        None,
1590        False
1591    ),
1592    (
1593        1,
1594        True,
1595        None,
1596        'good dummy state',
1597        False,
1598        None,
1599        None,
1600        False
1601    ),
1602    (
1603        0,
1604        False,
1605        None,
1606        None,
1607        True,
1608        'failed',
1609        'Timeout',
1610        True
1611    ),
1612    (
1613        1,
1614        False,
1615        None,
1616        None,
1617        False,
1618        'failed',
1619        'Exited with 1',
1620        True
1621    ),
1622    (
1623        1,
1624        False,
1625        'preexisting reason',
1626        'good dummy state',
1627        False,
1628        'failed',
1629        'preexisting reason',
1630        True
1631    ),
1632]
1633
1634@pytest.mark.parametrize(
1635    'self_returncode, self_ignore_qemu_crash,' \
1636    ' self_instance_reason, harness_state, is_timeout,' \
1637    ' expected_status, expected_reason, expected_called_missing_case',
1638    TESTDATA_21,
1639    ids=['not failed', 'qemu ignore', 'timeout', 'bad returncode', 'other fail']
1640)
1641def test_qemuhandler_update_instance_info(
1642    mocked_instance,
1643    self_returncode,
1644    self_ignore_qemu_crash,
1645    self_instance_reason,
1646    harness_state,
1647    is_timeout,
1648    expected_status,
1649    expected_reason,
1650    expected_called_missing_case
1651):
1652    mocked_instance.add_missing_case_status = mock.Mock()
1653    mocked_instance.reason = self_instance_reason
1654
1655    handler = QEMUHandler(mocked_instance, 'build')
1656    handler.returncode = self_returncode
1657    handler.ignore_qemu_crash = self_ignore_qemu_crash
1658
1659    handler._update_instance_info(harness_state, is_timeout)
1660
1661    assert handler.instance.status == expected_status
1662    assert handler.instance.reason == expected_reason
1663
1664    if expected_called_missing_case:
1665        mocked_instance.add_missing_case_status.assert_called_once_with(
1666            'blocked'
1667        )
1668
1669
1670def test_qemuhandler_thread_get_fifo_names():
1671    fifo_fn = 'dummy'
1672
1673    fifo_in, fifo_out = QEMUHandler._thread_get_fifo_names(fifo_fn)
1674
1675    assert fifo_in ==  'dummy.in'
1676    assert fifo_out ==  'dummy.out'
1677
1678TESTDATA_22 = [
1679    (False, False),
1680    (False, True),
1681    (True, False),
1682    (True, True),
1683]
1684
1685@pytest.mark.parametrize(
1686    'fifo_in_exists, fifo_out_exists',
1687    TESTDATA_22,
1688    ids=['both missing', 'out exists', 'in exists', 'both exist']
1689)
1690def test_qemuhandler_thread_open_files(fifo_in_exists, fifo_out_exists):
1691    def mock_exists(path):
1692        if path == 'fifo.in':
1693            return fifo_in_exists
1694        elif path == 'fifo.out':
1695            return fifo_out_exists
1696        else:
1697            raise ValueError('Unexpected path in mock of os.path.exists')
1698
1699    unlink_mock = mock.Mock()
1700    exists_mock = mock.Mock(side_effect=mock_exists)
1701    mkfifo_mock = mock.Mock()
1702
1703    fifo_in = 'fifo.in'
1704    fifo_out = 'fifo.out'
1705    logfile = 'log.file'
1706
1707    with mock.patch('os.unlink', unlink_mock), \
1708         mock.patch('os.mkfifo', mkfifo_mock), \
1709         mock.patch('os.path.exists', exists_mock), \
1710         mock.patch('builtins.open', mock.mock_open()) as open_mock:
1711        _, _, _ = QEMUHandler._thread_open_files(fifo_in, fifo_out, logfile)
1712
1713    open_mock.assert_has_calls([
1714        mock.call('fifo.in', 'wb'),
1715        mock.call('fifo.out', 'rb', buffering=0),
1716        mock.call('log.file', 'wt'),
1717    ])
1718
1719    if fifo_in_exists:
1720        unlink_mock.assert_any_call('fifo.in')
1721
1722    if fifo_out_exists:
1723        unlink_mock.assert_any_call('fifo.out')
1724
1725
1726TESTDATA_23 = [
1727    (False, False),
1728    (True, True),
1729    (True, False)
1730]
1731
1732@pytest.mark.parametrize(
1733    'is_pid, is_lookup_error',
1734    TESTDATA_23,
1735    ids=['pid missing', 'pid lookup error', 'pid ok']
1736)
1737def test_qemuhandler_thread_close_files(is_pid, is_lookup_error):
1738    is_process_killed = {}
1739
1740    def mock_kill(pid, sig):
1741        if is_lookup_error:
1742            raise ProcessLookupError(f'Couldn\'t find pid: {pid}.')
1743        elif sig == signal.SIGTERM:
1744            is_process_killed[pid] = True
1745
1746    unlink_mock = mock.Mock()
1747    kill_mock = mock.Mock(side_effect=mock_kill)
1748
1749    fifo_in = 'fifo.in'
1750    fifo_out = 'fifo.out'
1751    pid = 12345 if is_pid else None
1752    out_fp = mock.Mock()
1753    in_fp = mock.Mock()
1754    log_out_fp = mock.Mock()
1755
1756    with mock.patch('os.unlink', unlink_mock), \
1757         mock.patch('os.kill', kill_mock):
1758        QEMUHandler._thread_close_files(fifo_in, fifo_out, pid, out_fp,
1759                                        in_fp, log_out_fp)
1760
1761    out_fp.close.assert_called_once()
1762    in_fp.close.assert_called_once()
1763    log_out_fp.close.assert_called_once()
1764
1765    unlink_mock.assert_has_calls([mock.call('fifo.in'), mock.call('fifo.out')])
1766
1767    if is_pid and not is_lookup_error:
1768        assert is_process_killed[pid]
1769
1770
1771TESTDATA_24 = [
1772    ('timeout', 'failed', 'Timeout'),
1773    ('failed', 'failed', 'Failed'),
1774    ('unexpected eof', 'failed', 'unexpected eof'),
1775    ('unexpected byte', 'failed', 'unexpected byte'),
1776    (None, None, 'Unknown'),
1777]
1778
1779@pytest.mark.parametrize(
1780    'out_state, expected_status, expected_reason',
1781    TESTDATA_24,
1782    ids=['timeout', 'failed', 'unexpected eof', 'unexpected byte', 'unknown']
1783)
1784def test_qemuhandler_thread_update_instance_info(
1785    mocked_instance,
1786    out_state,
1787    expected_status,
1788    expected_reason
1789):
1790    handler = QEMUHandler(mocked_instance, 'build')
1791    handler_time = 59
1792
1793    QEMUHandler._thread_update_instance_info(handler, handler_time, out_state)
1794
1795    assert handler.instance.execution_time == handler_time
1796
1797    assert handler.instance.status == expected_status
1798    assert handler.instance.reason == expected_reason
1799
1800
1801TESTDATA_25 = [
1802    (
1803        ('1\n' * 60).encode('utf-8'),
1804        60,
1805        1,
1806        [None] * 60 + ['success'] * 6,
1807        1000,
1808        False,
1809        'timeout',
1810        [mock.call('1\n'), mock.call('1\n')]
1811    ),
1812    (
1813        ('1\n' * 60).encode('utf-8'),
1814        60,
1815        -1,
1816        [None] * 60 + ['success'] * 30,
1817        100,
1818        False,
1819        'failed',
1820        [mock.call('1\n'), mock.call('1\n')]
1821    ),
1822    (
1823        b'',
1824        60,
1825        1,
1826        ['success'] * 3,
1827        100,
1828        False,
1829        'unexpected eof',
1830        []
1831    ),
1832    (
1833        b'\x81',
1834        60,
1835        1,
1836        ['success'] * 3,
1837        100,
1838        False,
1839        'unexpected byte',
1840        []
1841    ),
1842    (
1843        '1\n2\n3\n4\n5\n'.encode('utf-8'),
1844        600,
1845        1,
1846        [None] * 3 + ['success'] * 7,
1847        100,
1848        False,
1849        'success',
1850        [mock.call('1\n'), mock.call('2\n'), mock.call('3\n'), mock.call('4\n')]
1851    ),
1852    (
1853        '1\n2\n3\n4\n5\n'.encode('utf-8'),
1854        600,
1855        0,
1856        [None] * 3 + ['success'] * 7,
1857        100,
1858        False,
1859        'timeout',
1860        [mock.call('1\n'), mock.call('2\n')]
1861    ),
1862    (
1863        '1\n2\n3\n4\n5\n'.encode('utf-8'),
1864        60,
1865        1,
1866        [None] * 3 + ['success'] * 7,
1867        (n for n in [100, 100, 10000]),
1868        True,
1869        'success',
1870        [mock.call('1\n'), mock.call('2\n'), mock.call('3\n'), mock.call('4\n')]
1871    ),
1872]
1873
1874@pytest.mark.parametrize(
1875    'content, timeout, pid, harness_states, cputime, capture_coverage,' \
1876    ' expected_out_state, expected_log_calls',
1877    TESTDATA_25,
1878    ids=[
1879        'timeout',
1880        'harness failed',
1881        'unexpected eof',
1882        'unexpected byte',
1883        'harness success',
1884        'timeout by pid=0',
1885        'capture_coverage'
1886    ]
1887)
1888def test_qemuhandler_thread(
1889    mocked_instance,
1890    faux_timer,
1891    content,
1892    timeout,
1893    pid,
1894    harness_states,
1895    cputime,
1896    capture_coverage,
1897    expected_out_state,
1898    expected_log_calls
1899):
1900    def mock_cputime(pid):
1901        if pid > 0:
1902            return cputime if isinstance(cputime, int) else next(cputime)
1903        else:
1904            raise ProcessLookupError()
1905
1906    type(mocked_instance.testsuite).timeout = mock.PropertyMock(return_value=timeout)
1907    handler = QEMUHandler(mocked_instance, 'build')
1908    handler.results = {}
1909    handler.ignore_unexpected_eof = False
1910    handler.pid_fn = 'pid_fn'
1911    handler.fifo_fn = 'fifo_fn'
1912    handler.options = mock.Mock(timeout_multiplier=1)
1913
1914    def mocked_open(filename, *args, **kwargs):
1915        if filename == handler.pid_fn:
1916            contents = str(pid).encode('utf-8')
1917        elif filename == handler.fifo_fn + '.out':
1918            contents = content
1919        else:
1920            contents = b''
1921
1922        file_object = mock.mock_open(read_data=contents).return_value
1923        file_object.__iter__.return_value = contents.splitlines(True)
1924        return file_object
1925
1926    harness = mock.Mock(capture_coverage=capture_coverage, handle=print)
1927    type(harness).state = mock.PropertyMock(side_effect=harness_states)
1928
1929    p = mock.Mock()
1930    p.poll = mock.Mock(
1931        side_effect=itertools.cycle([True, True, True, True, False])
1932    )
1933
1934    mock_thread_get_fifo_names = mock.Mock(
1935        return_value=('fifo_fn.in', 'fifo_fn.out')
1936    )
1937    log_fp_mock = mock.Mock()
1938    in_fp_mock = mocked_open('fifo_fn.out')
1939    out_fp_mock = mock.Mock()
1940    mock_thread_open_files = mock.Mock(
1941        return_value=(out_fp_mock, in_fp_mock, log_fp_mock)
1942    )
1943    mock_thread_close_files = mock.Mock()
1944    mock_thread_update_instance_info = mock.Mock()
1945
1946    with mock.patch('time.time', side_effect=faux_timer.time), \
1947         mock.patch('builtins.open', new=mocked_open), \
1948         mock.patch('select.poll', return_value=p), \
1949         mock.patch('os.path.exists', return_value=True), \
1950         mock.patch('twisterlib.handlers.QEMUHandler._get_cpu_time',
1951                    mock_cputime), \
1952         mock.patch('twisterlib.handlers.QEMUHandler._thread_get_fifo_names',
1953                    mock_thread_get_fifo_names), \
1954         mock.patch('twisterlib.handlers.QEMUHandler._thread_open_files',
1955                    mock_thread_open_files), \
1956         mock.patch('twisterlib.handlers.QEMUHandler._thread_close_files',
1957                    mock_thread_close_files), \
1958         mock.patch('twisterlib.handlers.QEMUHandler.' \
1959                    '_thread_update_instance_info',
1960                    mock_thread_update_instance_info):
1961        QEMUHandler._thread(
1962            handler,
1963            handler.get_test_timeout(),
1964            handler.build_dir,
1965            handler.log,
1966            handler.fifo_fn,
1967            handler.pid_fn,
1968            handler.results,
1969            harness,
1970            handler.ignore_unexpected_eof
1971        )
1972
1973    mock_thread_update_instance_info.assert_called_once_with(
1974        handler,
1975        mock.ANY,
1976        expected_out_state
1977    )
1978
1979    log_fp_mock.write.assert_has_calls(expected_log_calls)
1980
1981
1982TESTDATA_26 = [
1983    (True, False, None, True,
1984     ['No timeout, return code from QEMU (1): 1',
1985      'return code from QEMU (1): 1']),
1986    (False, True, 'passed', True, ['return code from QEMU (1): 0']),
1987    (False, True, 'failed', False, ['return code from QEMU (None): 1']),
1988]
1989
1990@pytest.mark.parametrize(
1991    'isatty, do_timeout, harness_state, exists_pid_fn, expected_logs',
1992    TESTDATA_26,
1993    ids=['no timeout, isatty', 'timeout passed', 'timeout, no pid_fn']
1994)
1995def test_qemuhandler_handle(
1996    mocked_instance,
1997    caplog,
1998    tmp_path,
1999    isatty,
2000    do_timeout,
2001    harness_state,
2002    exists_pid_fn,
2003    expected_logs
2004):
2005    def mock_wait(*args, **kwargs):
2006        if do_timeout:
2007            raise TimeoutExpired('dummy cmd', 'dummyamount')
2008
2009    mock_process = mock.Mock(pid=0, returncode=1)
2010    mock_process.communicate = mock.Mock(
2011        return_value=(mock.Mock(), mock.Mock())
2012    )
2013    mock_process.wait = mock.Mock(side_effect=mock_wait)
2014
2015    handler = QEMUHandler(mocked_instance, 'build')
2016
2017    def mock_path_exists(name, *args, **kwargs):
2018        return exists_pid_fn
2019
2020    def mock_popen(command, stdout=None, stdin=None, stderr=None, cwd=None):
2021        return mock.Mock(
2022            __enter__=mock.Mock(return_value=mock_process),
2023            __exit__=mock.Mock(return_value=None),
2024            communicate=mock.Mock(return_value=(mock.Mock(), mock.Mock()))
2025        )
2026
2027    def mock_thread(name=None, target=None, daemon=None, args=None):
2028        return mock.Mock()
2029
2030    def mock_filenames(sysbuild_build_dir):
2031        handler.fifo_fn = os.path.join('dummy', 'qemu-fifo')
2032        handler.pid_fn = os.path.join(sysbuild_build_dir, 'qemu.pid')
2033        handler.log_fn = os.path.join('dummy', 'log')
2034
2035    harness = mock.Mock(state=harness_state)
2036    handler_options_west_flash = []
2037
2038    sysbuild_build_dir = os.path.join('sysbuild', 'dummydir')
2039    command = ['generator_cmd', '-C', os.path.join('cmd', 'path'), 'run']
2040
2041    handler.options = mock.Mock(
2042        timeout_multiplier=1,
2043        west_flash=handler_options_west_flash,
2044        west_runner=None
2045    )
2046    handler.run_custom_script = mock.Mock(return_value=None)
2047    handler.make_device_available = mock.Mock(return_value=None)
2048    handler._final_handle_actions = mock.Mock(return_value=None)
2049    handler._create_command = mock.Mock(return_value=command)
2050    handler._set_qemu_filenames = mock.Mock(side_effect=mock_filenames)
2051    handler._get_sysbuild_build_dir = mock.Mock(return_value=sysbuild_build_dir)
2052    handler.terminate = mock.Mock()
2053
2054    unlink_mock = mock.Mock()
2055
2056    with mock.patch('subprocess.Popen', side_effect=mock_popen), \
2057         mock.patch('builtins.open', mock.mock_open(read_data='1')), \
2058         mock.patch('threading.Thread', side_effect=mock_thread), \
2059         mock.patch('os.path.exists', side_effect=mock_path_exists), \
2060         mock.patch('os.unlink', unlink_mock), \
2061         mock.patch('sys.stdout.isatty', return_value=isatty):
2062        handler.handle(harness)
2063
2064    assert all([expected_log in caplog.text for expected_log in expected_logs])
2065
2066
2067def test_qemuhandler_get_fifo(mocked_instance):
2068    handler = QEMUHandler(mocked_instance, 'build')
2069    handler.fifo_fn = 'fifo_fn'
2070
2071    result = handler.get_fifo()
2072
2073    assert result == 'fifo_fn'
2074