1#!/usr/bin/env python3
2# Copyright (c) 2023 Intel Corporation
3#
4# SPDX-License-Identifier: Apache-2.0
5"""
6Tests for handlers.py classes' methods
7"""
8
9import itertools
10import mock
11import os
12import pytest
13import signal
14import subprocess
15import sys
16
17from contextlib import nullcontext
18from importlib import reload
19from serial import SerialException
20from subprocess import CalledProcessError, TimeoutExpired
21from types import SimpleNamespace
22
23import twisterlib.harness
24
25ZEPHYR_BASE = os.getenv("ZEPHYR_BASE")
26
27from twisterlib.error import TwisterException
28from twisterlib.handlers import (
29    Handler,
30    BinaryHandler,
31    DeviceHandler,
32    QEMUHandler,
33    SimulationHandler
34)
35
36
37@pytest.fixture
38def mocked_instance(tmp_path):
39    instance = mock.Mock()
40
41    testsuite = mock.Mock()
42    type(testsuite).source_dir = mock.PropertyMock(return_value='')
43    instance.testsuite = testsuite
44
45    build_dir = tmp_path / 'build_dir'
46    os.makedirs(build_dir)
47    type(instance).build_dir = mock.PropertyMock(return_value=str(build_dir))
48
49    platform = mock.Mock()
50    type(platform).binaries = mock.PropertyMock(return_value=[])
51    instance.platform = platform
52
53    type(instance.testsuite).timeout = mock.PropertyMock(return_value=60)
54    type(instance.platform).timeout_multiplier = mock.PropertyMock(
55        return_value=2
56    )
57
58    instance.status = None
59    instance.reason = 'Unknown'
60
61    return instance
62
63
64@pytest.fixture
65def faux_timer():
66    class Counter:
67        def __init__(self):
68            self.t = 0
69
70        def time(self):
71            self.t += 1
72            return self.t
73
74    return Counter()
75
76
77TESTDATA_1 = [
78    (True, False, 'posix', ['Install pyserial python module with pip to use' \
79     ' --device-testing option.'], None),
80    (False, True, 'nt', [], None),
81    (True, True, 'posix', ['Install pyserial python module with pip to use' \
82     ' --device-testing option.'], ImportError),
83]
84
85@pytest.mark.parametrize(
86    'fail_serial, fail_pty, os_name, expected_outs, expected_error',
87    TESTDATA_1,
88    ids=['import serial', 'import pty nt', 'import serial+pty posix']
89)
90def test_imports(
91    capfd,
92    fail_serial,
93    fail_pty,
94    os_name,
95    expected_outs,
96    expected_error
97):
98    class ImportRaiser:
99        def find_spec(self, fullname, path, target=None):
100            if fullname == 'serial' and fail_serial:
101                raise ImportError()
102            if fullname == 'pty' and fail_pty:
103                raise ImportError()
104
105    modules_mock = sys.modules.copy()
106    modules_mock['serial'] = None if fail_serial else modules_mock['serial']
107    modules_mock['pty'] = None if fail_pty else modules_mock['pty']
108
109    meta_path_mock = sys.meta_path[:]
110    meta_path_mock.insert(0, ImportRaiser())
111
112    with mock.patch('os.name', os_name), \
113         mock.patch.dict('sys.modules', modules_mock, clear=True), \
114         mock.patch('sys.meta_path', meta_path_mock), \
115         pytest.raises(expected_error) if expected_error else nullcontext():
116        reload(twisterlib.handlers)
117
118    out, _ = capfd.readouterr()
119    assert all([expected_out in out for expected_out in expected_outs])
120
121
122def test_handler_final_handle_actions(mocked_instance):
123    instance = mocked_instance
124    instance.testcases = [mock.Mock()]
125
126    handler = Handler(mocked_instance)
127    handler.suite_name_check = True
128
129    harness = twisterlib.harness.Test()
130    harness.state = mock.Mock()
131    harness.detected_suite_names = mock.Mock()
132    harness.matched_run_id = False
133    harness.run_id_exists = True
134    harness.recording = mock.Mock()
135
136    handler_time = mock.Mock()
137
138    handler._final_handle_actions(harness, handler_time)
139
140    assert handler.instance.status == 'failed'
141    assert handler.instance.execution_time == handler_time
142    assert handler.instance.reason == 'RunID mismatch'
143    assert all(testcase.status == 'failed' for \
144     testcase in handler.instance.testcases)
145
146    handler.instance.reason = 'This reason shan\'t be changed.'
147    handler._final_handle_actions(harness, handler_time)
148
149    instance.assert_has_calls([mock.call.record(harness.recording)])
150
151    assert handler.instance.reason == 'This reason shan\'t be changed.'
152
153
154TESTDATA_2 = [
155    (['dummy_testsuite_name'], False),
156    ([], True),
157    (['another_dummy_name', 'yet_another_dummy_name'], True),
158]
159
160@pytest.mark.parametrize(
161    'detected_suite_names, should_be_called',
162    TESTDATA_2,
163    ids=['detected one expected', 'detected none', 'detected two unexpected']
164)
165def test_handler_verify_ztest_suite_name(
166    mocked_instance,
167    detected_suite_names,
168    should_be_called
169):
170    instance = mocked_instance
171    type(instance.testsuite).ztest_suite_names = ['dummy_testsuite_name']
172
173    harness_state = 'passed'
174
175    handler_time = mock.Mock()
176
177    with mock.patch.object(Handler, '_missing_suite_name') as _missing_mocked:
178        handler = Handler(instance)
179        handler._verify_ztest_suite_name(
180            harness_state,
181            detected_suite_names,
182            handler_time
183        )
184
185        if should_be_called:
186            _missing_mocked.assert_called_once()
187        else:
188            _missing_mocked.assert_not_called()
189
190
191def test_handler_missing_suite_name(mocked_instance):
192    instance = mocked_instance
193    instance.testcases = [mock.Mock()]
194
195    handler = Handler(mocked_instance)
196    handler.suite_name_check = True
197
198    expected_suite_names = ['dummy_testsuite_name']
199
200    handler_time = mock.Mock()
201
202    handler._missing_suite_name(expected_suite_names, handler_time)
203
204    assert handler.instance.status == 'failed'
205    assert handler.instance.execution_time == handler_time
206    assert handler.instance.reason == 'Testsuite mismatch'
207    assert all(
208        testcase.status == 'failed' for testcase in handler.instance.testcases
209    )
210
211
212def test_handler_terminate(mocked_instance):
213    def mock_kill_function(pid, sig):
214        if pid < 0:
215            raise ProcessLookupError
216
217    instance = mocked_instance
218
219    handler = Handler(instance)
220
221    mock_process = mock.Mock()
222    mock_child1 = mock.Mock(pid=1)
223    mock_child2 = mock.Mock(pid=2)
224    mock_process.children = mock.Mock(return_value=[mock_child1, mock_child2])
225
226    mock_proc = mock.Mock(pid=0)
227    mock_proc.terminate = mock.Mock(return_value=None)
228    mock_proc.kill = mock.Mock(return_value=None)
229
230    with mock.patch('psutil.Process', return_value=mock_process), \
231         mock.patch(
232        'os.kill',
233        mock.Mock(side_effect=mock_kill_function)
234    ) as mock_kill:
235        handler.terminate(mock_proc)
236
237        assert handler.terminated
238        mock_proc.terminate.assert_called_once()
239        mock_proc.kill.assert_called_once()
240        mock_kill.assert_has_calls(
241            [mock.call(1, signal.SIGTERM), mock.call(2, signal.SIGTERM)]
242        )
243
244        mock_child_neg1 = mock.Mock(pid=-1)
245        mock_process.children = mock.Mock(
246            return_value=[mock_child_neg1, mock_child2]
247        )
248        handler.terminated = False
249        mock_kill.reset_mock()
250
251        handler.terminate(mock_proc)
252
253    mock_kill.assert_has_calls(
254        [mock.call(-1, signal.SIGTERM), mock.call(2, signal.SIGTERM)]
255    )
256
257
258def test_binaryhandler_try_kill_process_by_pid(mocked_instance):
259    def mock_kill_function(pid, sig):
260        if pid < 0:
261            raise ProcessLookupError
262
263    instance = mocked_instance
264
265    handler = BinaryHandler(instance, 'build')
266    handler.pid_fn = os.path.join('dummy', 'path', 'to', 'pid.pid')
267
268    with mock.patch(
269        'os.kill',
270        mock.Mock(side_effect=mock_kill_function)
271    ) as mock_kill, \
272         mock.patch('os.unlink', mock.Mock()) as mock_unlink:
273        with mock.patch('builtins.open', mock.mock_open(read_data='1')):
274            handler.try_kill_process_by_pid()
275
276        mock_unlink.assert_called_once_with(
277            os.path.join('dummy', 'path', 'to', 'pid.pid')
278        )
279        mock_kill.assert_called_once_with(1, signal.SIGKILL)
280
281        mock_unlink.reset_mock()
282        mock_kill.reset_mock()
283        handler.pid_fn = os.path.join('dummy', 'path', 'to', 'pid.pid')
284
285        with mock.patch('builtins.open', mock.mock_open(read_data='-1')):
286            handler.try_kill_process_by_pid()
287
288        mock_unlink.assert_called_once_with(
289            os.path.join('dummy', 'path', 'to', 'pid.pid')
290        )
291        mock_kill.assert_called_once_with(-1, signal.SIGKILL)
292
293
294TESTDATA_3 = [
295    (
296        [b'This\\r\\n', b'is\r', b'a short', b'file.'],
297        mock.Mock(state=False, capture_coverage=False),
298        [
299            mock.call('This\\r\\n'),
300            mock.call('is\r'),
301            mock.call('a short'),
302            mock.call('file.')
303        ],
304        [
305            mock.call('This'),
306            mock.call('is'),
307            mock.call('a short'),
308            mock.call('file.')
309        ],
310        None,
311        False
312    ),
313    (
314        [b'Too much.'] * 120,  # Should be more than the timeout
315        mock.Mock(state=False, capture_coverage=False),
316        None,
317        None,
318        True,
319        False
320    ),
321    (
322        [b'Too much.'] * 120,  # Should be more than the timeout
323        mock.Mock(state=True, capture_coverage=False),
324        None,
325        None,
326        True,
327        False
328    ),
329    (
330        [b'Too much.'] * 120,  # Should be more than the timeout
331        mock.Mock(state=True, capture_coverage=True),
332        None,
333        None,
334        False,
335        True
336    ),
337]
338
339@pytest.mark.parametrize(
340    'proc_stdout, harness, expected_handler_calls,'
341    ' expected_harness_calls, should_be_less, timeout_wait',
342    TESTDATA_3,
343    ids=[
344        'no timeout',
345        'timeout',
346        'timeout with harness state',
347        'timeout with capture_coverage, wait timeout'
348    ]
349)
350def test_binaryhandler_output_handler(
351    mocked_instance,
352    faux_timer,
353    proc_stdout,
354    harness,
355    expected_handler_calls,
356    expected_harness_calls,
357    should_be_less,
358    timeout_wait
359):
360    class MockStdout(mock.Mock):
361        def __init__(self, text):
362            super().__init__(text)
363            self.text = text
364            self.line_index = 0
365
366        def readline(self):
367            if self.line_index == len(self.text):
368                self.line_index = 0
369                return b''
370            else:
371                line = self.text[self.line_index]
372                self.line_index += 1
373                return line
374
375    class MockProc(mock.Mock):
376        def __init__(self, pid, stdout):
377            super().__init__(pid, stdout)
378            self.pid = mock.PropertyMock(return_value=pid)
379            self.stdout = MockStdout(stdout)
380
381        def wait(self, *args, **kwargs):
382            if timeout_wait:
383                raise TimeoutExpired('dummy cmd', 'dummyamount')
384
385    handler = BinaryHandler(mocked_instance, 'build')
386    handler.terminate = mock.Mock()
387    handler.options = mock.Mock(timeout_multiplier=1)
388
389    proc = MockProc(1, proc_stdout)
390
391    with mock.patch(
392        'builtins.open',
393        mock.mock_open(read_data='')
394    ) as mock_file, \
395         mock.patch('time.time', side_effect=faux_timer.time):
396        handler._output_handler(proc, harness)
397
398        mock_file.assert_called_with(handler.log, 'wt')
399
400    if expected_handler_calls:
401        mock_file.return_value.write.assert_has_calls(expected_handler_calls)
402    if expected_harness_calls:
403        harness.handle.assert_has_calls(expected_harness_calls)
404    if should_be_less is not None:
405        if should_be_less:
406            assert mock_file.return_value.write.call_count < len(proc_stdout)
407        else:
408            assert mock_file.return_value.write.call_count == len(proc_stdout)
409    if timeout_wait:
410        handler.terminate.assert_called_once_with(proc)
411
412
413TESTDATA_4 = [
414    (True, False, True, None, None,
415     ['valgrind', '--error-exitcode=2', '--leak-check=full',
416      f'--suppressions={ZEPHYR_BASE}/scripts/valgrind.supp',
417      '--log-file=build_dir/valgrind.log', '--track-origins=yes',
418      'generator']),
419    (False, True, False, 123, None, ['generator', 'run', '--seed=123']),
420    (False, False, False, None, ['ex1', 'ex2'], ['build_dir/zephyr/zephyr.exe', 'ex1', 'ex2']),
421]
422
423@pytest.mark.parametrize(
424    'robot_test, call_make_run, enable_valgrind, seed,' \
425    ' extra_args, expected',
426    TESTDATA_4,
427    ids=['robot, valgrind', 'make run, seed', 'binary, extra']
428)
429def test_binaryhandler_create_command(
430    mocked_instance,
431    robot_test,
432    call_make_run,
433    enable_valgrind,
434    seed,
435    extra_args,
436    expected
437):
438    handler = BinaryHandler(mocked_instance, 'build')
439    handler.generator_cmd = 'generator'
440    handler.binary = 'bin'
441    handler.call_make_run = call_make_run
442    handler.options = SimpleNamespace()
443    handler.options.enable_valgrind = enable_valgrind
444    handler.options.coverage_basedir = "coverage_basedir"
445    handler.seed = seed
446    handler.extra_test_args = extra_args
447    handler.build_dir = 'build_dir'
448    handler.instance.sysbuild = False
449    handler.platform = SimpleNamespace()
450    handler.platform.resc = "file.resc"
451    handler.platform.uart = "uart"
452
453    command = handler._create_command(robot_test)
454
455    assert command == expected
456
457
458TESTDATA_5 = [
459    (False, False, False),
460    (True, False, False),
461    (True, True, False),
462    (False, False, True),
463]
464
465@pytest.mark.parametrize(
466    'enable_asan, enable_lsan, enable_ubsan',
467    TESTDATA_5,
468    ids=['none', 'asan', 'asan, lsan', 'ubsan']
469)
470def test_binaryhandler_create_env(
471    mocked_instance,
472    enable_asan,
473    enable_lsan,
474    enable_ubsan
475):
476    handler = BinaryHandler(mocked_instance, 'build')
477    handler.options = mock.Mock(
478        enable_asan=enable_asan,
479        enable_lsan=enable_lsan,
480        enable_ubsan=enable_ubsan
481    )
482
483    env = {
484        'example_env_var': True,
485        'ASAN_OPTIONS': 'dummy=dummy:',
486        'UBSAN_OPTIONS': 'dummy=dummy:'
487    }
488
489    with mock.patch('os.environ', env):
490        res = handler._create_env()
491
492    assert env['example_env_var'] == res['example_env_var']
493
494    if enable_ubsan:
495        assert env['UBSAN_OPTIONS'] in res['UBSAN_OPTIONS']
496        assert 'log_path=stdout:' in res['UBSAN_OPTIONS']
497        assert 'halt_on_error=1:' in res['UBSAN_OPTIONS']
498
499    if enable_asan:
500        assert env['ASAN_OPTIONS'] in res['ASAN_OPTIONS']
501        assert 'log_path=stdout:' in res['ASAN_OPTIONS']
502
503        if not enable_lsan:
504            assert 'detect_leaks=0' in res['ASAN_OPTIONS']
505
506
507TESTDATA_6 = [
508    (None, False, 2, True, 'failed', 'Valgrind error', False),
509    (None, False, 1, False, 'failed', 'Failed', False),
510    ('failed', False, 0, False, 'failed', 'Failed', False),
511    ('success', False, 0, False, 'success', 'Unknown', False),
512    (None, True, 1, True, 'failed', 'Timeout', True),
513]
514
515@pytest.mark.parametrize(
516    'harness_state, terminated, returncode, enable_valgrind,' \
517    ' expected_status, expected_reason, do_add_missing',
518    TESTDATA_6,
519    ids=['valgrind error', 'failed', 'harness failed', 'success', 'no state']
520)
521def test_binaryhandler_update_instance_info(
522    mocked_instance,
523    harness_state,
524    terminated,
525    returncode,
526    enable_valgrind,
527    expected_status,
528    expected_reason,
529    do_add_missing
530):
531    handler = BinaryHandler(mocked_instance, 'build')
532    handler_time = 59
533    handler.terminated = terminated
534    handler.returncode = returncode
535    handler.options = mock.Mock(enable_valgrind=enable_valgrind)
536    missing_mock = mock.Mock()
537    handler.instance.add_missing_case_status = missing_mock
538
539    handler._update_instance_info(harness_state, handler_time)
540
541    assert handler.instance.execution_time == handler_time
542
543    assert handler.instance.status == expected_status
544    assert handler.instance.reason == expected_reason
545
546    if do_add_missing:
547        missing_mock.assert_called_once_with('blocked', expected_reason)
548
549
550TESTDATA_7 = [
551    (True, False, False),
552    (False, True, False),
553    (False, False, True),
554]
555
556@pytest.mark.parametrize(
557    'is_robot_test, coverage, isatty',
558    TESTDATA_7,
559    ids=['robot test', 'coverage', 'isatty']
560)
561def test_binaryhandler_handle(
562    mocked_instance,
563    caplog,
564    is_robot_test,
565    coverage,
566    isatty
567):
568    thread_mock_obj = mock.Mock()
569
570    def mock_popen(command, *args, **kwargs,):
571        return mock.Mock(
572          __enter__=mock.Mock(return_value=mock.Mock(pid=0, returncode=0)),
573          __exit__=mock.Mock(return_value=None)
574        )
575
576    def mock_thread(target, *args, **kwargs):
577        return thread_mock_obj
578
579    handler = BinaryHandler(mocked_instance, 'build')
580    handler.sourcedir = 'source_dir'
581    handler.build_dir = 'build_dir'
582    handler.name= 'Dummy Name'
583    handler._create_command = mock.Mock(return_value=['dummy' , 'command'])
584    handler._create_env = mock.Mock(return_value=[])
585    handler._update_instance_info = mock.Mock()
586    handler._final_handle_actions = mock.Mock()
587    handler.terminate = mock.Mock()
588    handler.try_kill_process_by_pid = mock.Mock()
589    handler.options = mock.Mock(coverage=coverage)
590
591    robot_mock = mock.Mock()
592    harness = mock.Mock(is_robot_test=is_robot_test, run_robot_test=robot_mock)
593
594    popen_mock = mock.Mock(side_effect=mock_popen)
595    thread_mock = mock.Mock(side_effect=mock_thread)
596    call_mock = mock.Mock()
597
598    with mock.patch('subprocess.call', call_mock), \
599         mock.patch('subprocess.Popen', popen_mock), \
600         mock.patch('threading.Thread', thread_mock), \
601         mock.patch('sys.stdout.isatty', return_value=isatty):
602        handler.handle(harness)
603
604    if is_robot_test:
605        robot_mock.assert_called_once_with(['dummy', 'command'], mock.ANY)
606        return
607
608    assert 'Spawning BinaryHandler Thread for Dummy Name' in caplog.text
609
610    thread_mock_obj.join.assert_called()
611    handler._update_instance_info.assert_called_once()
612    handler._final_handle_actions.assert_called_once()
613
614    if isatty:
615        call_mock.assert_any_call(['stty', 'sane'], stdin=mock.ANY)
616
617
618TESTDATA_8 = [
619    ('renode', True, True, False, False),
620    ('native', False, False, False, True),
621    ('build', False, True, False, False),
622]
623
624@pytest.mark.parametrize(
625    'type_str, is_pid_fn, expected_call_make_run, is_binary, expected_ready',
626    TESTDATA_8,
627    ids=[t[0] for t in TESTDATA_8]
628)
629def test_simulationhandler_init(
630    mocked_instance,
631    type_str,
632    is_pid_fn,
633    expected_call_make_run,
634    is_binary,
635    expected_ready
636):
637    handler = SimulationHandler(mocked_instance, type_str)
638
639    assert handler.call_make_run == expected_call_make_run
640    assert handler.ready == expected_ready
641
642    if is_pid_fn:
643        assert handler.pid_fn == os.path.join(mocked_instance.build_dir,
644                                              'renode.pid')
645    if is_binary:
646        assert handler.pid_fn == os.path.join(mocked_instance.build_dir,
647                                              'zephyr', 'zephyr.exe')
648
649
650TESTDATA_9 = [
651    (3, 2, 0, 0, 3, -1, True, False, False, 1),
652    (4, 1, 0, 0, -1, -1, False, True, False, 0),
653    (5, 0, 1, 2, -1, 4, False, False, True, 3)
654]
655
656@pytest.mark.parametrize(
657    'success_count, in_waiting_count, oserror_count, readline_error_count,'
658    ' haltless_count, stateless_count, end_by_halt, end_by_close,'
659    ' end_by_state, expected_line_count',
660    TESTDATA_9,
661    ids=[
662      'halt event',
663      'serial closes',
664      'harness state with errors'
665    ]
666)
667def test_devicehandler_monitor_serial(
668    mocked_instance,
669    success_count,
670    in_waiting_count,
671    oserror_count,
672    readline_error_count,
673    haltless_count,
674    stateless_count,
675    end_by_halt,
676    end_by_close,
677    end_by_state,
678    expected_line_count
679):
680    is_open_iter = iter(lambda: True, False)
681    line_iter = [
682        TypeError('dummy TypeError') if x % 2 else \
683        SerialException('dummy SerialException') for x in range(
684            readline_error_count
685        )
686    ] + [
687        f'line no {idx}'.encode('utf-8') for idx in range(success_count)
688    ]
689    in_waiting_iter = [False] * in_waiting_count + [
690        TypeError('dummy TypeError')
691    ] if end_by_close else (
692        [OSError('dummy OSError')] * oserror_count + [False] * in_waiting_count
693    ) + [True] * (success_count + readline_error_count)
694
695    is_set_iter = [False] * haltless_count + [True] \
696        if end_by_halt else iter(lambda: False, True)
697
698    state_iter = [False] * stateless_count + [True] \
699        if end_by_state else iter(lambda: False, True)
700
701    halt_event = mock.Mock(is_set=mock.Mock(side_effect=is_set_iter))
702    ser = mock.Mock(
703        isOpen=mock.Mock(side_effect=is_open_iter),
704        readline=mock.Mock(side_effect=line_iter)
705    )
706    type(ser).in_waiting = mock.PropertyMock(
707        side_effect=in_waiting_iter,
708        return_value=False
709    )
710    harness = mock.Mock(capture_coverage=False)
711    type(harness).state=mock.PropertyMock(side_effect=state_iter)
712
713    handler = DeviceHandler(mocked_instance, 'build')
714    handler.options = mock.Mock(enable_coverage=not end_by_state)
715
716    with mock.patch('builtins.open', mock.mock_open(read_data='')):
717        handler.monitor_serial(ser, halt_event, harness)
718
719    if not end_by_close:
720        ser.close.assert_called_once()
721
722    harness.handle.assert_has_calls(
723        [mock.call(f'line no {idx}') for idx in range(expected_line_count)]
724    )
725
726
727TESTDATA_10 = [
728    (
729        'dummy_platform',
730        'dummy fixture',
731        [
732            mock.Mock(
733                fixtures=[],
734                platform='dummy_platform',
735                available=1,
736                counter=0
737            ),
738            mock.Mock(
739                fixtures=['dummy fixture'],
740                platform='another_platform',
741                available=1,
742                counter=0
743            ),
744            mock.Mock(
745                fixtures=['dummy fixture'],
746                platform='dummy_platform',
747                serial_pty=None,
748                serial=None,
749                available=1,
750                counter=0
751            ),
752            mock.Mock(
753                fixtures=['dummy fixture'],
754                platform='dummy_platform',
755                serial_pty=mock.Mock(),
756                available=1,
757                counter=0
758            )
759        ],
760        3
761    ),
762    (
763        'dummy_platform',
764        'dummy fixture',
765        [],
766        TwisterException
767    ),
768    (
769        'dummy_platform',
770        'dummy fixture',
771        [
772            mock.Mock(
773                fixtures=['dummy fixture'],
774                platform='dummy_platform',
775                serial_pty=mock.Mock(),
776                available=0
777            ),
778            mock.Mock(
779                fixtures=['another fixture'],
780                platform='dummy_platform',
781                serial_pty=mock.Mock(),
782                available=0
783            ),
784            mock.Mock(
785                fixtures=['dummy fixture'],
786                platform='dummy_platform',
787                serial=mock.Mock(),
788                available=0
789            ),
790            mock.Mock(
791                fixtures=['another fixture'],
792                platform='dummy_platform',
793                serial=mock.Mock(),
794                available=0
795            )
796        ],
797        None
798    )
799]
800
801@pytest.mark.parametrize(
802    'platform_name, fixture, duts, expected',
803    TESTDATA_10,
804    ids=['one good dut', 'exception - no duts', 'no available duts']
805)
806def test_devicehandler_device_is_available(
807    mocked_instance,
808    platform_name,
809    fixture,
810    duts,
811    expected
812):
813    mocked_instance.platform.name = platform_name
814    mocked_instance.testsuite.harness_config = {'fixture': fixture}
815
816    handler = DeviceHandler(mocked_instance, 'build')
817    handler.duts = duts
818
819    if isinstance(expected, int):
820        device = handler.device_is_available(mocked_instance)
821
822        assert device == duts[expected]
823        assert device.available == 0
824        assert device.counter == 1
825    elif expected is None:
826        device = handler.device_is_available(mocked_instance)
827
828        assert device is None
829    elif isinstance(expected, type):
830        with pytest.raises(expected):
831            device = handler.device_is_available(mocked_instance)
832    else:
833        assert False
834
835
836def test_devicehandler_make_device_available(mocked_instance):
837    serial = mock.Mock(name='dummy_serial')
838    duts = [
839        mock.Mock(available=0, serial=serial, serial_pty=None),
840        mock.Mock(available=0, serial=None, serial_pty=serial),
841        mock.Mock(
842            available=0,
843            serial=mock.Mock('another_serial'),
844            serial_pty=None
845        )
846    ]
847
848    handler = DeviceHandler(mocked_instance, 'build')
849    handler.duts = duts
850
851    handler.make_device_available(serial)
852
853    assert len([None for d in handler.duts if d.available == 1]) == 2
854    assert handler.duts[2].available == 0
855
856
857TESTDATA_11 = [
858    (mock.Mock(pid=0, returncode=0), False),
859    (mock.Mock(pid=0, returncode=1), False),
860    (mock.Mock(pid=0, returncode=1), True)
861]
862
863@pytest.mark.parametrize(
864    'mock_process, raise_timeout',
865    TESTDATA_11,
866    ids=['proper script', 'error', 'timeout']
867)
868def test_devicehandler_run_custom_script(caplog, mock_process, raise_timeout):
869    def raise_timeout_fn(timeout=-1):
870        if raise_timeout and timeout != -1:
871            raise subprocess.TimeoutExpired(None, timeout)
872        else:
873            return mock.Mock(), mock.Mock()
874
875    def assert_popen(command, *args, **kwargs):
876        return mock.Mock(
877            __enter__=mock.Mock(return_value=mock_process),
878            __exit__=mock.Mock(return_value=None)
879        )
880
881    mock_process.communicate = mock.Mock(side_effect=raise_timeout_fn)
882
883    script = [os.path.join('test','script', 'path'), 'arg']
884    timeout = 60
885
886    with mock.patch('subprocess.Popen', side_effect=assert_popen):
887        DeviceHandler.run_custom_script(script, timeout)
888
889    if raise_timeout:
890        assert all(
891            t in caplog.text.lower() for t in [str(script), 'timed out']
892        )
893        mock_process.assert_has_calls(
894            [
895                mock.call.communicate(timeout=timeout),
896                mock.call.kill(),
897                mock.call.communicate()
898            ]
899        )
900    elif mock_process.returncode == 0:
901        assert not any([r.levelname == 'ERROR' for r in caplog.records])
902    else:
903        assert 'timed out' not in caplog.text.lower()
904        assert 'custom script failure' in caplog.text.lower()
905
906
907TESTDATA_12 = [
908    (0, False),
909    (4, False),
910    (0, True)
911]
912
913@pytest.mark.parametrize(
914    'num_of_failures, raise_exception',
915    TESTDATA_12,
916    ids=['no failures', 'with failures', 'exception']
917)
918def test_devicehandler_get_hardware(
919    mocked_instance,
920    caplog,
921    num_of_failures,
922    raise_exception
923):
924    expected_hardware = mock.Mock()
925
926    def mock_availability(handler, instance, no=num_of_failures):
927        if raise_exception:
928            raise TwisterException(f'dummy message')
929        if handler.no:
930            handler.no -= 1
931            return None
932        return expected_hardware
933
934    handler = DeviceHandler(mocked_instance, 'build')
935    handler.no = num_of_failures
936
937    with mock.patch.object(
938        DeviceHandler,
939        'device_is_available',
940        mock_availability
941    ):
942        hardware = handler.get_hardware()
943
944    if raise_exception:
945        assert 'dummy message' in caplog.text.lower()
946        assert mocked_instance.status == 'failed'
947        assert mocked_instance.reason == 'dummy message'
948    else:
949        assert hardware == expected_hardware
950
951
952TESTDATA_13 = [
953    (
954        None,
955        None,
956        None,
957        ['generator_cmd', '-C', '$build_dir', 'flash']
958    ),
959    (
960        [],
961        None,
962        None,
963        ['west', 'flash', '--skip-rebuild', '-d', '$build_dir']
964    ),
965    (
966        '--dummy',
967        None,
968        None,
969        ['west', 'flash', '--skip-rebuild', '-d', '$build_dir',
970         '--', '--dummy']
971    ),
972    (
973        '--dummy1,--dummy2',
974        None,
975        None,
976        ['west', 'flash', '--skip-rebuild', '-d', '$build_dir',
977         '--', '--dummy1', '--dummy2']
978    ),
979
980    (
981        None,
982        'runner',
983        'product',
984        ['west', 'flash', '--skip-rebuild', '-d', '$build_dir',
985         '--runner', 'runner', 'param1', 'param2']
986    ),
987
988    (
989        None,
990        'pyocd',
991        'product',
992        ['west', 'flash', '--skip-rebuild', '-d', '$build_dir',
993         '--runner', 'pyocd', 'param1', 'param2', '--', '--dev-id', 12345]
994    ),
995    (
996        None,
997        'nrfjprog',
998        'product',
999        ['west', 'flash', '--skip-rebuild', '-d', '$build_dir',
1000         '--runner', 'nrfjprog', 'param1', 'param2', '--', '--dev-id', 12345]
1001    ),
1002    (
1003        None,
1004        'openocd',
1005        'STM32 STLink',
1006        ['west', 'flash', '--skip-rebuild', '-d', '$build_dir',
1007         '--runner', 'openocd', 'param1', 'param2',
1008         '--', '--cmd-pre-init', 'hla_serial 12345']
1009    ),
1010    (
1011        None,
1012        'openocd',
1013        'STLINK-V3',
1014        ['west', 'flash', '--skip-rebuild', '-d', '$build_dir',
1015         '--runner', 'openocd', 'param1', 'param2',
1016         '--', '--cmd-pre-init', 'hla_serial 12345']
1017    ),
1018    (
1019        None,
1020        'openocd',
1021        'EDBG CMSIS-DAP',
1022        ['west', 'flash', '--skip-rebuild', '-d', '$build_dir',
1023         '--runner', 'openocd', 'param1', 'param2',
1024         '--', '--cmd-pre-init', 'cmsis_dap_serial 12345']
1025    ),
1026    (
1027        None,
1028        'jlink',
1029        'product',
1030        ['west', 'flash', '--skip-rebuild', '-d', '$build_dir',
1031         '--runner', 'jlink', '--tool-opt=-SelectEmuBySN  12345',  # 2x space
1032         'param1', 'param2']
1033    ),
1034    (
1035        None,
1036        'stm32cubeprogrammer',
1037        'product',
1038        ['west', 'flash', '--skip-rebuild', '-d', '$build_dir',
1039         '--runner', 'stm32cubeprogrammer', '--tool-opt=sn=12345',
1040         'param1', 'param2']
1041    ),
1042
1043]
1044
1045TESTDATA_13_2 = [(True), (False)]
1046
1047@pytest.mark.parametrize(
1048    'self_west_flash, runner,' \
1049    ' hardware_product_name, expected',
1050    TESTDATA_13,
1051    ids=['generator', '--west-flash', 'one west flash value',
1052         'multiple west flash values', 'generic runner', 'pyocd',
1053         'nrfjprog', 'openocd, STM32 STLink', 'openocd, STLINK-v3',
1054         'openocd, EDBG CMSIS-DAP', 'jlink', 'stm32cubeprogrammer']
1055)
1056@pytest.mark.parametrize('hardware_probe', TESTDATA_13_2, ids=['probe', 'id'])
1057def test_devicehandler_create_command(
1058    mocked_instance,
1059    self_west_flash,
1060    runner,
1061    hardware_probe,
1062    hardware_product_name,
1063    expected
1064):
1065    handler = DeviceHandler(mocked_instance, 'build')
1066    handler.options = mock.Mock(west_flash=self_west_flash)
1067    handler.generator_cmd = 'generator_cmd'
1068
1069    expected = [handler.build_dir if val == '$build_dir' else \
1070                val for val in expected]
1071
1072    hardware = mock.Mock(
1073        product=hardware_product_name,
1074        probe_id=12345 if hardware_probe else None,
1075        id=12345 if not hardware_probe else None,
1076        runner_params=['param1', 'param2']
1077    )
1078
1079    command = handler._create_command(runner, hardware)
1080
1081    assert command == expected
1082
1083
1084TESTDATA_14 = [
1085    ('success', False, 'success', 'Unknown', False),
1086    ('failed', False, 'failed', 'Failed', True),
1087    ('error', False, 'error', 'Unknown', True),
1088    (None, True, None, 'Unknown', False),
1089    (None, False, 'failed', 'Timeout', True),
1090]
1091
1092@pytest.mark.parametrize(
1093    'harness_state, flash_error,' \
1094    ' expected_status, expected_reason, do_add_missing',
1095    TESTDATA_14,
1096    ids=['success', 'failed', 'error', 'flash error', 'no status']
1097)
1098def test_devicehandler_update_instance_info(
1099        mocked_instance,
1100        harness_state,
1101        flash_error,
1102        expected_status,
1103        expected_reason,
1104        do_add_missing
1105        ):
1106    handler = DeviceHandler(mocked_instance, 'build')
1107    handler_time = 59
1108    missing_mock = mock.Mock()
1109    handler.instance.add_missing_case_status = missing_mock
1110
1111    handler._update_instance_info(harness_state, handler_time, flash_error)
1112
1113    assert handler.instance.execution_time == handler_time
1114
1115    assert handler.instance.status == expected_status
1116    assert handler.instance.reason == expected_reason
1117
1118    if do_add_missing:
1119        missing_mock.assert_called_with('blocked', expected_reason)
1120
1121
1122TESTDATA_15 = [
1123    ('dummy device', 'dummy pty', None, None, True, False, False),
1124    (
1125        'dummy device',
1126        'dummy pty',
1127        mock.Mock(communicate=mock.Mock(return_value=('', ''))),
1128        SerialException,
1129        False,
1130        True,
1131        'dummy pty'
1132    ),
1133    (
1134        'dummy device',
1135        None,
1136        None,
1137        SerialException,
1138        False,
1139        False,
1140        'dummy device'
1141    )
1142]
1143
1144@pytest.mark.parametrize(
1145    'serial_device, serial_pty, ser_pty_process, expected_exception,' \
1146    ' expected_result, terminate_ser_pty_process, make_available',
1147    TESTDATA_15,
1148    ids=['valid', 'serial pty process', 'no serial pty']
1149)
1150def test_devicehandler_create_serial_connection(
1151    mocked_instance,
1152    serial_device,
1153    serial_pty,
1154    ser_pty_process,
1155    expected_exception,
1156    expected_result,
1157    terminate_ser_pty_process,
1158    make_available
1159):
1160    def mock_serial(*args, **kwargs):
1161        if expected_exception:
1162            raise expected_exception('')
1163        return expected_result
1164
1165    handler = DeviceHandler(mocked_instance, 'build')
1166    handler.make_device_available = mock.Mock()
1167    missing_mock = mock.Mock()
1168    handler.instance.add_missing_case_status = missing_mock
1169    available_mock = mock.Mock()
1170    handler.make_device_available = available_mock
1171    handler.options = mock.Mock(timeout_multiplier=1)
1172    twisterlib.handlers.terminate_process = mock.Mock()
1173
1174    hardware_baud = 14400
1175    flash_timeout = 60
1176    serial_mock = mock.Mock(side_effect=mock_serial)
1177
1178    with mock.patch('serial.Serial', serial_mock), \
1179         pytest.raises(expected_exception) if expected_exception else \
1180         nullcontext():
1181        result = handler._create_serial_connection(serial_device, hardware_baud,
1182                                                   flash_timeout, serial_pty,
1183                                                   ser_pty_process)
1184
1185    if expected_result:
1186        assert result is not None
1187
1188    if expected_exception:
1189        assert handler.instance.status == 'failed'
1190        assert handler.instance.reason == 'Serial Device Error'
1191
1192        missing_mock.assert_called_once_with('blocked', 'Serial Device Error')
1193
1194    if terminate_ser_pty_process:
1195        twisterlib.handlers.terminate_process.assert_called_once()
1196        ser_pty_process.communicate.assert_called_once()
1197
1198    if make_available:
1199        available_mock.assert_called_once_with(make_available)
1200
1201
1202TESTDATA_16 = [
1203    ('dummy1 dummy2', None, 'slave name'),
1204    ('dummy1,dummy2', CalledProcessError, None),
1205    (None, None, 'dummy hardware serial'),
1206]
1207
1208@pytest.mark.parametrize(
1209    'serial_pty, popen_exception, expected_device',
1210    TESTDATA_16,
1211    ids=['pty', 'pty process error', 'no pty']
1212)
1213def test_devicehandler_get_serial_device(
1214    mocked_instance,
1215    serial_pty,
1216    popen_exception,
1217    expected_device
1218):
1219    def mock_popen(command, *args, **kwargs):
1220        assert command == ['dummy1', 'dummy2']
1221        if popen_exception:
1222            raise popen_exception(command, 'Dummy error')
1223        return mock.Mock()
1224
1225    handler = DeviceHandler(mocked_instance, 'build')
1226    hardware_serial = 'dummy hardware serial'
1227
1228    popen_mock = mock.Mock(side_effect=mock_popen)
1229    openpty_mock = mock.Mock(return_value=('master', 'slave'))
1230    ttyname_mock = mock.Mock(side_effect=lambda x: x + ' name')
1231
1232    with mock.patch('subprocess.Popen', popen_mock), \
1233         mock.patch('pty.openpty', openpty_mock), \
1234         mock.patch('os.ttyname', ttyname_mock):
1235        result = handler._get_serial_device(serial_pty, hardware_serial)
1236
1237    if popen_exception:
1238        assert result is None
1239    else:
1240        assert result[0] == expected_device
1241
1242TESTDATA_17 = [
1243    (False, False, False, False, None, False, False,
1244     None, None, []),
1245    (True, True, False, False, None, False, False,
1246     None, None, []),
1247    (True, False, True, False, None, False, False,
1248     'error', 'Device issue (Flash error)', []),
1249    (True, False, False, True, None, False, False,
1250     'error', 'Device issue (Timeout)', ['Flash operation timed out.']),
1251    (True, False, False, False, 1, False, False,
1252     'error', 'Device issue (Flash error?)', []),
1253    (True, False, False, False, 0, True, False,
1254     None, None, ['Timed out while monitoring serial output on IPName']),
1255    (True, False, False, False, 0, False, True,
1256     None, None, ["Terminating serial-pty:'Serial PTY'",
1257                  "Terminated serial-pty:'Serial PTY', stdout:'', stderr:''"]),
1258]
1259
1260@pytest.mark.parametrize(
1261    'has_hardware, raise_create_serial, raise_popen, raise_timeout,' \
1262    ' returncode, do_timeout_thread, use_pty,' \
1263    ' expected_status, expected_reason, expected_logs',
1264    TESTDATA_17,
1265    ids=['no hardware', 'create serial failure', 'popen called process error',
1266         'communicate timeout', 'nonzero returncode', 'valid pty', 'valid dev']
1267)
1268def test_devicehandler_handle(
1269    mocked_instance,
1270    caplog,
1271    has_hardware,
1272    raise_create_serial,
1273    raise_popen,
1274    raise_timeout,
1275    returncode,
1276    do_timeout_thread,
1277    use_pty,
1278    expected_status,
1279    expected_reason,
1280    expected_logs
1281):
1282    def mock_get_serial(serial_pty, hardware_serial):
1283        if serial_pty:
1284            serial_pty_process = mock.Mock(
1285                name='dummy serial PTY process',
1286                communicate=mock.Mock(
1287                    return_value=('', '')
1288                )
1289            )
1290            return 'dummy serial PTY device', serial_pty_process
1291        return 'dummy serial device', None
1292
1293    def mock_create_serial(*args, **kwargs):
1294        if raise_create_serial:
1295            raise SerialException('dummy cmd', 'dummy msg')
1296        return mock.Mock(name='dummy serial')
1297
1298    def mock_thread(*args, **kwargs):
1299        is_alive_mock = mock.Mock(return_value=bool(do_timeout_thread))
1300
1301        return mock.Mock(is_alive=is_alive_mock)
1302
1303    def mock_terminate(proc, *args, **kwargs):
1304        proc.communicate = mock.Mock(return_value=(mock.Mock(), mock.Mock()))
1305
1306    def mock_communicate(*args, **kwargs):
1307        if raise_timeout:
1308            raise TimeoutExpired('dummy cmd', 'dummyamount')
1309        return mock.Mock(), mock.Mock()
1310
1311    def mock_popen(command, *args, **kwargs):
1312        if raise_popen:
1313            raise CalledProcessError('dummy proc', 'dummy msg')
1314
1315        mock_process = mock.Mock(
1316            pid=1,
1317            returncode=returncode,
1318            communicate=mock.Mock(side_effect=mock_communicate)
1319        )
1320
1321        return mock.Mock(
1322            __enter__=mock.Mock(return_value=mock_process),
1323            __exit__=mock.Mock(return_value=None)
1324        )
1325
1326    hardware = None if not has_hardware else mock.Mock(
1327        baud=14400,
1328        runner='dummy runner',
1329        serial_pty='Serial PTY' if use_pty else None,
1330        serial='dummy serial',
1331        pre_script='dummy pre script',
1332        post_script='dummy post script',
1333        post_flash_script='dummy post flash script',
1334        flash_timeout=60,
1335        flash_with_test=True
1336    )
1337
1338    handler = DeviceHandler(mocked_instance, 'build')
1339    handler.get_hardware = mock.Mock(return_value=hardware)
1340    handler.options = mock.Mock(
1341        timeout_multiplier=1,
1342        west_flash=None,
1343        west_runner=None
1344    )
1345    handler._get_serial_device = mock.Mock(side_effect=mock_get_serial)
1346    handler._create_command = mock.Mock(return_value=['dummy', 'command'])
1347    handler.run_custom_script = mock.Mock()
1348    handler._create_serial_connection = mock.Mock(
1349        side_effect=mock_create_serial
1350    )
1351    handler.monitor_serial = mock.Mock()
1352    handler.terminate = mock.Mock(side_effect=mock_terminate)
1353    handler._update_instance_info = mock.Mock()
1354    handler._final_handle_actions = mock.Mock()
1355    handler.make_device_available = mock.Mock()
1356    twisterlib.handlers.terminate_process = mock.Mock()
1357    handler.instance.platform.name = 'IPName'
1358
1359    harness = mock.Mock()
1360
1361    with mock.patch('builtins.open', mock.mock_open(read_data='')), \
1362         mock.patch('subprocess.Popen', side_effect=mock_popen), \
1363         mock.patch('threading.Event', mock.Mock()), \
1364         mock.patch('threading.Thread', side_effect=mock_thread):
1365        handler.handle(harness)
1366
1367    handler.get_hardware.assert_called_once()
1368
1369    messages = [record.msg for record in caplog.records]
1370    assert all([msg in messages for msg in expected_logs])
1371
1372    if not has_hardware:
1373        return
1374
1375    handler.run_custom_script.assert_has_calls([
1376        mock.call('dummy pre script', mock.ANY)
1377    ])
1378
1379    if raise_create_serial:
1380        return
1381
1382    handler.run_custom_script.assert_has_calls([
1383        mock.call('dummy pre script', mock.ANY),
1384        mock.call('dummy post flash script', mock.ANY),
1385        mock.call('dummy post script', mock.ANY)
1386    ])
1387
1388    if expected_reason:
1389        assert handler.instance.reason == expected_reason
1390    if expected_status:
1391        assert handler.instance.status == expected_status
1392
1393    handler.make_device_available.assert_called_once_with(
1394        'Serial PTY' if use_pty else 'dummy serial device'
1395    )
1396
1397
1398TESTDATA_18 = [
1399    (True, True, True),
1400    (False, False, False),
1401]
1402
1403@pytest.mark.parametrize(
1404    'ignore_qemu_crash, expected_ignore_crash, expected_ignore_unexpected_eof',
1405    TESTDATA_18,
1406    ids=['ignore crash', 'qemu crash']
1407)
1408def test_qemuhandler_init(
1409    mocked_instance,
1410    ignore_qemu_crash,
1411    expected_ignore_crash,
1412    expected_ignore_unexpected_eof
1413):
1414    mocked_instance.testsuite.ignore_qemu_crash = ignore_qemu_crash
1415
1416    handler = QEMUHandler(mocked_instance, 'build')
1417
1418    assert handler.ignore_qemu_crash == expected_ignore_crash
1419    assert handler.ignore_unexpected_eof == expected_ignore_unexpected_eof
1420
1421
1422def test_qemuhandler_get_cpu_time():
1423    def mock_process(pid):
1424        return mock.Mock(
1425            cpu_times=mock.Mock(
1426                return_value=mock.Mock(
1427                    user=20.0,
1428                    system=64.0
1429                )
1430            )
1431        )
1432
1433    with mock.patch('psutil.Process', mock_process):
1434        res = QEMUHandler._get_cpu_time(0)
1435
1436    assert res == pytest.approx(84.0)
1437
1438
1439TESTDATA_19 = [
1440    (
1441        True,
1442        os.path.join('self', 'dummy_dir', '1'),
1443        mock.PropertyMock(return_value=os.path.join('dummy_dir', '1')),
1444        os.path.join('dummy_dir', '1')
1445    ),
1446    (
1447        False,
1448        os.path.join('self', 'dummy_dir', '2'),
1449        mock.PropertyMock(return_value=os.path.join('dummy_dir', '2')),
1450        os.path.join('self', 'dummy_dir', '2')
1451    ),
1452]
1453
1454@pytest.mark.parametrize(
1455    'self_sysbuild, self_build_dir, build_dir, expected',
1456    TESTDATA_19,
1457    ids=['domains build dir', 'self build dir']
1458)
1459def test_qemuhandler_get_default_domain_build_dir(
1460    mocked_instance,
1461    self_sysbuild,
1462    self_build_dir,
1463    build_dir,
1464    expected
1465):
1466    get_default_domain_mock = mock.Mock()
1467    type(get_default_domain_mock()).build_dir = build_dir
1468    domains_mock = mock.Mock(get_default_domain=get_default_domain_mock)
1469    from_file_mock = mock.Mock(return_value=domains_mock)
1470
1471    handler = QEMUHandler(mocked_instance, 'build')
1472    handler.instance.sysbuild = self_sysbuild
1473    handler.build_dir = self_build_dir
1474
1475    with mock.patch('domains.Domains.from_file', from_file_mock):
1476        result = handler.get_default_domain_build_dir()
1477
1478    assert result == expected
1479
1480
1481TESTDATA_20 = [
1482    (
1483        os.path.join('self', 'dummy_dir', 'log1'),
1484        os.path.join('self', 'dummy_dir', 'pid1'),
1485        os.path.join('sysbuild', 'dummy_dir', 'bd1'),
1486        True
1487    ),
1488    (
1489        os.path.join('self', 'dummy_dir', 'log2'),
1490        os.path.join('self', 'dummy_dir', 'pid2'),
1491        os.path.join('sysbuild', 'dummy_dir', 'bd2'),
1492        False
1493    ),
1494]
1495
1496@pytest.mark.parametrize(
1497    'self_log, self_pid_fn, sysbuild_build_dir, exists_pid_fn',
1498    TESTDATA_20,
1499    ids=['pid exists', 'pid missing']
1500)
1501def test_qemuhandler_set_qemu_filenames(
1502    mocked_instance,
1503    self_log,
1504    self_pid_fn,
1505    sysbuild_build_dir,
1506    exists_pid_fn
1507):
1508    unlink_mock = mock.Mock()
1509    exists_mock = mock.Mock(return_value=exists_pid_fn)
1510
1511    handler = QEMUHandler(mocked_instance, 'build')
1512    handler.log = self_log
1513    handler.pid_fn = self_pid_fn
1514
1515    with mock.patch('os.unlink', unlink_mock), \
1516         mock.patch('os.path.exists', exists_mock):
1517        handler._set_qemu_filenames(sysbuild_build_dir)
1518
1519    assert handler.fifo_fn == mocked_instance.build_dir + \
1520                              os.path.sep + 'qemu-fifo'
1521
1522    assert handler.pid_fn ==  sysbuild_build_dir + os.path.sep + 'qemu.pid'
1523
1524    assert handler.log_fn == self_log
1525
1526    if exists_pid_fn:
1527        unlink_mock.assert_called_once_with(sysbuild_build_dir + \
1528                                            os.path.sep + 'qemu.pid')
1529
1530
1531def test_qemuhandler_create_command(mocked_instance):
1532    sysbuild_build_dir = os.path.join('sysbuild', 'dummy_dir')
1533
1534    handler = QEMUHandler(mocked_instance, 'build')
1535    handler.generator_cmd = 'dummy_cmd'
1536
1537    result = handler._create_command(sysbuild_build_dir)
1538
1539    assert result == ['dummy_cmd', '-C', 'sysbuild' + os.path.sep + 'dummy_dir',
1540                      'run']
1541
1542
1543TESTDATA_21 = [
1544    (
1545        0,
1546        False,
1547        None,
1548        'good dummy state',
1549        False,
1550        None,
1551        None,
1552        False
1553    ),
1554    (
1555        1,
1556        True,
1557        None,
1558        'good dummy state',
1559        False,
1560        None,
1561        None,
1562        False
1563    ),
1564    (
1565        0,
1566        False,
1567        None,
1568        None,
1569        True,
1570        'failed',
1571        'Timeout',
1572        True
1573    ),
1574    (
1575        1,
1576        False,
1577        None,
1578        None,
1579        False,
1580        'failed',
1581        'Exited with 1',
1582        True
1583    ),
1584    (
1585        1,
1586        False,
1587        'preexisting reason',
1588        'good dummy state',
1589        False,
1590        'failed',
1591        'preexisting reason',
1592        True
1593    ),
1594]
1595
1596@pytest.mark.parametrize(
1597    'self_returncode, self_ignore_qemu_crash,' \
1598    ' self_instance_reason, harness_state, is_timeout,' \
1599    ' expected_status, expected_reason, expected_called_missing_case',
1600    TESTDATA_21,
1601    ids=['not failed', 'qemu ignore', 'timeout', 'bad returncode', 'other fail']
1602)
1603def test_qemuhandler_update_instance_info(
1604    mocked_instance,
1605    self_returncode,
1606    self_ignore_qemu_crash,
1607    self_instance_reason,
1608    harness_state,
1609    is_timeout,
1610    expected_status,
1611    expected_reason,
1612    expected_called_missing_case
1613):
1614    mocked_instance.add_missing_case_status = mock.Mock()
1615    mocked_instance.reason = self_instance_reason
1616
1617    handler = QEMUHandler(mocked_instance, 'build')
1618    handler.returncode = self_returncode
1619    handler.ignore_qemu_crash = self_ignore_qemu_crash
1620
1621    handler._update_instance_info(harness_state, is_timeout)
1622
1623    assert handler.instance.status == expected_status
1624    assert handler.instance.reason == expected_reason
1625
1626    if expected_called_missing_case:
1627        mocked_instance.add_missing_case_status.assert_called_once_with(
1628            'blocked'
1629        )
1630
1631
1632def test_qemuhandler_thread_get_fifo_names():
1633    fifo_fn = 'dummy'
1634
1635    fifo_in, fifo_out = QEMUHandler._thread_get_fifo_names(fifo_fn)
1636
1637    assert fifo_in ==  'dummy.in'
1638    assert fifo_out ==  'dummy.out'
1639
1640TESTDATA_22 = [
1641    (False, False),
1642    (False, True),
1643    (True, False),
1644    (True, True),
1645]
1646
1647@pytest.mark.parametrize(
1648    'fifo_in_exists, fifo_out_exists',
1649    TESTDATA_22,
1650    ids=['both missing', 'out exists', 'in exists', 'both exist']
1651)
1652def test_qemuhandler_thread_open_files(fifo_in_exists, fifo_out_exists):
1653    def mock_exists(path):
1654        if path == 'fifo.in':
1655            return fifo_in_exists
1656        elif path == 'fifo.out':
1657            return fifo_out_exists
1658        else:
1659            raise ValueError('Unexpected path in mock of os.path.exists')
1660
1661    unlink_mock = mock.Mock()
1662    exists_mock = mock.Mock(side_effect=mock_exists)
1663    mkfifo_mock = mock.Mock()
1664
1665    fifo_in = 'fifo.in'
1666    fifo_out = 'fifo.out'
1667    logfile = 'log.file'
1668
1669    with mock.patch('os.unlink', unlink_mock), \
1670         mock.patch('os.mkfifo', mkfifo_mock), \
1671         mock.patch('os.path.exists', exists_mock), \
1672         mock.patch('builtins.open', mock.mock_open()) as open_mock:
1673        _, _, _ = QEMUHandler._thread_open_files(fifo_in, fifo_out, logfile)
1674
1675    open_mock.assert_has_calls([
1676        mock.call('fifo.in', 'wb'),
1677        mock.call('fifo.out', 'rb', buffering=0),
1678        mock.call('log.file', 'wt'),
1679    ])
1680
1681    if fifo_in_exists:
1682        unlink_mock.assert_any_call('fifo.in')
1683
1684    if fifo_out_exists:
1685        unlink_mock.assert_any_call('fifo.out')
1686
1687
1688TESTDATA_23 = [
1689    (False, False),
1690    (True, True),
1691    (True, False)
1692]
1693
1694@pytest.mark.parametrize(
1695    'is_pid, is_lookup_error',
1696    TESTDATA_23,
1697    ids=['pid missing', 'pid lookup error', 'pid ok']
1698)
1699def test_qemuhandler_thread_close_files(is_pid, is_lookup_error):
1700    is_process_killed = {}
1701
1702    def mock_kill(pid, sig):
1703        if is_lookup_error:
1704            raise ProcessLookupError(f'Couldn\'t find pid: {pid}.')
1705        elif sig == signal.SIGTERM:
1706            is_process_killed[pid] = True
1707
1708    unlink_mock = mock.Mock()
1709    kill_mock = mock.Mock(side_effect=mock_kill)
1710
1711    fifo_in = 'fifo.in'
1712    fifo_out = 'fifo.out'
1713    pid = 12345 if is_pid else None
1714    out_fp = mock.Mock()
1715    in_fp = mock.Mock()
1716    log_out_fp = mock.Mock()
1717
1718    with mock.patch('os.unlink', unlink_mock), \
1719         mock.patch('os.kill', kill_mock):
1720        QEMUHandler._thread_close_files(fifo_in, fifo_out, pid, out_fp,
1721                                        in_fp, log_out_fp)
1722
1723    out_fp.close.assert_called_once()
1724    in_fp.close.assert_called_once()
1725    log_out_fp.close.assert_called_once()
1726
1727    unlink_mock.assert_has_calls([mock.call('fifo.in'), mock.call('fifo.out')])
1728
1729    if is_pid and not is_lookup_error:
1730        assert is_process_killed[pid]
1731
1732
1733TESTDATA_24 = [
1734    ('failed', 'timeout', 'failed', 'timeout'),
1735    ('failed', 'Execution error', 'failed', 'Execution error'),
1736    ('failed', 'unexpected eof', 'failed', 'unexpected eof'),
1737    ('failed', 'unexpected byte', 'failed', 'unexpected byte'),
1738    (None, None, None, 'Unknown'),
1739]
1740
1741@pytest.mark.parametrize(
1742    '_status, _reason, expected_status, expected_reason',
1743    TESTDATA_24,
1744    ids=['timeout', 'failed', 'unexpected eof', 'unexpected byte', 'unknown']
1745)
1746def test_qemuhandler_thread_update_instance_info(
1747    mocked_instance,
1748    _status,
1749    _reason,
1750    expected_status,
1751    expected_reason
1752):
1753    handler = QEMUHandler(mocked_instance, 'build')
1754    handler_time = 59
1755
1756    QEMUHandler._thread_update_instance_info(handler, handler_time, _status, _reason)
1757
1758    assert handler.instance.execution_time == handler_time
1759
1760    assert handler.instance.status == expected_status
1761    assert handler.instance.reason == expected_reason
1762
1763
1764TESTDATA_25 = [
1765    (
1766        ('1\n' * 60).encode('utf-8'),
1767        60,
1768        1,
1769        [None] * 60 + ['success'] * 6,
1770        1000,
1771        False,
1772        'failed',
1773        'timeout',
1774        [mock.call('1\n'), mock.call('1\n')]
1775    ),
1776    (
1777        ('1\n' * 60).encode('utf-8'),
1778        60,
1779        -1,
1780        [None] * 60 + ['success'] * 30,
1781        100,
1782        False,
1783        'failed',
1784        None,
1785        [mock.call('1\n'), mock.call('1\n')]
1786    ),
1787    (
1788        b'',
1789        60,
1790        1,
1791        ['success'] * 3,
1792        100,
1793        False,
1794        'failed',
1795        'unexpected eof',
1796        []
1797    ),
1798    (
1799        b'\x81',
1800        60,
1801        1,
1802        ['success'] * 3,
1803        100,
1804        False,
1805        'failed',
1806        'unexpected byte',
1807        []
1808    ),
1809    (
1810        '1\n2\n3\n4\n5\n'.encode('utf-8'),
1811        600,
1812        1,
1813        [None] * 3 + ['success'] * 7,
1814        100,
1815        False,
1816        'success',
1817        None,
1818        [mock.call('1\n'), mock.call('2\n'), mock.call('3\n'), mock.call('4\n')]
1819    ),
1820    (
1821        '1\n2\n3\n4\n5\n'.encode('utf-8'),
1822        600,
1823        0,
1824        [None] * 3 + ['success'] * 7,
1825        100,
1826        False,
1827        'failed',
1828        'timeout',
1829        [mock.call('1\n'), mock.call('2\n')]
1830    ),
1831    (
1832        '1\n2\n3\n4\n5\n'.encode('utf-8'),
1833        60,
1834        1,
1835        [None] * 3 + ['success'] * 7,
1836        (n for n in [100, 100, 10000]),
1837        True,
1838        'success',
1839        None,
1840        [mock.call('1\n'), mock.call('2\n'), mock.call('3\n'), mock.call('4\n')]
1841    ),
1842]
1843
1844@pytest.mark.parametrize(
1845    'content, timeout, pid, harness_states, cputime, capture_coverage,' \
1846    ' expected_status, expected_reason, expected_log_calls',
1847    TESTDATA_25,
1848    ids=[
1849        'timeout',
1850        'harness failed',
1851        'unexpected eof',
1852        'unexpected byte',
1853        'harness success',
1854        'timeout by pid=0',
1855        'capture_coverage'
1856    ]
1857)
1858def test_qemuhandler_thread(
1859    mocked_instance,
1860    faux_timer,
1861    content,
1862    timeout,
1863    pid,
1864    harness_states,
1865    cputime,
1866    capture_coverage,
1867    expected_status,
1868    expected_reason,
1869    expected_log_calls
1870):
1871    def mock_cputime(pid):
1872        if pid > 0:
1873            return cputime if isinstance(cputime, int) else next(cputime)
1874        else:
1875            raise ProcessLookupError()
1876
1877    type(mocked_instance.testsuite).timeout = mock.PropertyMock(return_value=timeout)
1878    handler = QEMUHandler(mocked_instance, 'build')
1879    handler.ignore_unexpected_eof = False
1880    handler.pid_fn = 'pid_fn'
1881    handler.fifo_fn = 'fifo_fn'
1882    handler.options = mock.Mock(timeout_multiplier=1)
1883
1884    def mocked_open(filename, *args, **kwargs):
1885        if filename == handler.pid_fn:
1886            contents = str(pid).encode('utf-8')
1887        elif filename == handler.fifo_fn + '.out':
1888            contents = content
1889        else:
1890            contents = b''
1891
1892        file_object = mock.mock_open(read_data=contents).return_value
1893        file_object.__iter__.return_value = contents.splitlines(True)
1894        return file_object
1895
1896    harness = mock.Mock(capture_coverage=capture_coverage, handle=print)
1897    type(harness).state = mock.PropertyMock(side_effect=harness_states)
1898
1899    p = mock.Mock()
1900    p.poll = mock.Mock(
1901        side_effect=itertools.cycle([True, True, True, True, False])
1902    )
1903
1904    mock_thread_get_fifo_names = mock.Mock(
1905        return_value=('fifo_fn.in', 'fifo_fn.out')
1906    )
1907    log_fp_mock = mock.Mock()
1908    in_fp_mock = mocked_open('fifo_fn.out')
1909    out_fp_mock = mock.Mock()
1910    mock_thread_open_files = mock.Mock(
1911        return_value=(out_fp_mock, in_fp_mock, log_fp_mock)
1912    )
1913    mock_thread_close_files = mock.Mock()
1914    mock_thread_update_instance_info = mock.Mock()
1915
1916    with mock.patch('time.time', side_effect=faux_timer.time), \
1917         mock.patch('builtins.open', new=mocked_open), \
1918         mock.patch('select.poll', return_value=p), \
1919         mock.patch('os.path.exists', return_value=True), \
1920         mock.patch('twisterlib.handlers.QEMUHandler._get_cpu_time',
1921                    mock_cputime), \
1922         mock.patch('twisterlib.handlers.QEMUHandler._thread_get_fifo_names',
1923                    mock_thread_get_fifo_names), \
1924         mock.patch('twisterlib.handlers.QEMUHandler._thread_open_files',
1925                    mock_thread_open_files), \
1926         mock.patch('twisterlib.handlers.QEMUHandler._thread_close_files',
1927                    mock_thread_close_files), \
1928         mock.patch('twisterlib.handlers.QEMUHandler.' \
1929                    '_thread_update_instance_info',
1930                    mock_thread_update_instance_info):
1931        QEMUHandler._thread(
1932            handler,
1933            handler.get_test_timeout(),
1934            handler.build_dir,
1935            handler.log,
1936            handler.fifo_fn,
1937            handler.pid_fn,
1938            harness,
1939            handler.ignore_unexpected_eof
1940        )
1941
1942    mock_thread_update_instance_info.assert_called_once_with(
1943        handler,
1944        mock.ANY,
1945        expected_status,
1946        mock.ANY
1947    )
1948
1949    log_fp_mock.write.assert_has_calls(expected_log_calls)
1950
1951
1952TESTDATA_26 = [
1953    (True, False, None, True,
1954     ['No timeout, return code from QEMU (1): 1',
1955      'return code from QEMU (1): 1']),
1956    (False, True, 'passed', True, ['return code from QEMU (1): 0']),
1957    (False, True, 'failed', False, ['return code from QEMU (None): 1']),
1958]
1959
1960@pytest.mark.parametrize(
1961    'isatty, do_timeout, harness_state, exists_pid_fn, expected_logs',
1962    TESTDATA_26,
1963    ids=['no timeout, isatty', 'timeout passed', 'timeout, no pid_fn']
1964)
1965def test_qemuhandler_handle(
1966    mocked_instance,
1967    caplog,
1968    tmp_path,
1969    isatty,
1970    do_timeout,
1971    harness_state,
1972    exists_pid_fn,
1973    expected_logs
1974):
1975    def mock_wait(*args, **kwargs):
1976        if do_timeout:
1977            raise TimeoutExpired('dummy cmd', 'dummyamount')
1978
1979    mock_process = mock.Mock(pid=0, returncode=1)
1980    mock_process.communicate = mock.Mock(
1981        return_value=(mock.Mock(), mock.Mock())
1982    )
1983    mock_process.wait = mock.Mock(side_effect=mock_wait)
1984
1985    handler = QEMUHandler(mocked_instance, 'build')
1986
1987    def mock_path_exists(name, *args, **kwargs):
1988        return exists_pid_fn
1989
1990    def mock_popen(command, stdout=None, stdin=None, stderr=None, cwd=None):
1991        return mock.Mock(
1992            __enter__=mock.Mock(return_value=mock_process),
1993            __exit__=mock.Mock(return_value=None),
1994            communicate=mock.Mock(return_value=(mock.Mock(), mock.Mock()))
1995        )
1996
1997    def mock_thread(name=None, target=None, daemon=None, args=None):
1998        return mock.Mock()
1999
2000    def mock_filenames(sysbuild_build_dir):
2001        handler.fifo_fn = os.path.join('dummy', 'qemu-fifo')
2002        handler.pid_fn = os.path.join(sysbuild_build_dir, 'qemu.pid')
2003        handler.log_fn = os.path.join('dummy', 'log')
2004
2005    harness = mock.Mock(state=harness_state)
2006    handler_options_west_flash = []
2007
2008    domain_build_dir = os.path.join('sysbuild', 'dummydir')
2009    command = ['generator_cmd', '-C', os.path.join('cmd', 'path'), 'run']
2010
2011    handler.options = mock.Mock(
2012        timeout_multiplier=1,
2013        west_flash=handler_options_west_flash,
2014        west_runner=None
2015    )
2016    handler.run_custom_script = mock.Mock(return_value=None)
2017    handler.make_device_available = mock.Mock(return_value=None)
2018    handler._final_handle_actions = mock.Mock(return_value=None)
2019    handler._create_command = mock.Mock(return_value=command)
2020    handler._set_qemu_filenames = mock.Mock(side_effect=mock_filenames)
2021    handler.get_default_domain_build_dir = mock.Mock(return_value=domain_build_dir)
2022    handler.terminate = mock.Mock()
2023
2024    unlink_mock = mock.Mock()
2025
2026    with mock.patch('subprocess.Popen', side_effect=mock_popen), \
2027         mock.patch('builtins.open', mock.mock_open(read_data='1')), \
2028         mock.patch('threading.Thread', side_effect=mock_thread), \
2029         mock.patch('os.path.exists', side_effect=mock_path_exists), \
2030         mock.patch('os.unlink', unlink_mock), \
2031         mock.patch('sys.stdout.isatty', return_value=isatty):
2032        handler.handle(harness)
2033
2034    assert all([expected_log in caplog.text for expected_log in expected_logs])
2035
2036
2037def test_qemuhandler_get_fifo(mocked_instance):
2038    handler = QEMUHandler(mocked_instance, 'build')
2039    handler.fifo_fn = 'fifo_fn'
2040
2041    result = handler.get_fifo()
2042
2043    assert result == 'fifo_fn'
2044