1# Copyright (c) 2020 Vestas Wind Systems A/S
2#
3# SPDX-License-Identifier: Apache-2.0
4
5import argparse
6import os
7from unittest.mock import patch, call
8
9import pytest
10
11from runners.canopen_program import CANopenBinaryRunner
12from conftest import RC_KERNEL_BIN
13
14#
15# Test values
16#
17
18TEST_DEF_CONTEXT = 'default'
19TEST_ALT_CONTEXT = 'alternate'
20
21#
22# Test cases
23#
24
25TEST_CASES = [(n, x, p, c, o, t, r, s, d, b)
26              for n in range(1, 3)
27              for x in (None, TEST_ALT_CONTEXT)
28              for p in range(1, 3)
29              for c in (False, True)
30              for o in (False, True)
31              for t in range(1, 3)
32              for r in range(1, 3)
33              for s in range(1, 3)
34              for d in [256, 1024]
35              for b in range(False, True)]
36
37os_path_isfile = os.path.isfile
38
39def os_path_isfile_patch(filename):
40    if filename == RC_KERNEL_BIN:
41        return True
42    return os_path_isfile(filename)
43
44@pytest.mark.parametrize('test_case', TEST_CASES)
45@patch('runners.canopen_program.CANopenProgramDownloader')
46def test_canopen_program_create(cpd, test_case, runner_config):
47    '''Test CANopen runner created from command line parameters.'''
48    node_id, context, program_number, confirm, confirm_only, timeout, sdo_retries, sdo_timeout, download_buffer_size, block_transfer = test_case
49
50    args = ['--node-id', str(node_id)]
51    if context is not None:
52        args.extend(['--can-context', context])
53    if program_number:
54        args.extend(['--program-number', str(program_number)])
55    if not confirm:
56        args.append('--no-confirm')
57    if confirm_only:
58        args.append('--confirm-only')
59    if timeout:
60        args.extend(['--timeout', str(timeout)])
61    if sdo_retries:
62        args.extend(['--sdo-retries', str(sdo_retries)])
63    if sdo_timeout:
64        args.extend(['--sdo-timeout', str(sdo_timeout)])
65    if download_buffer_size:
66        args.extend(['--download-buffer-size', str(download_buffer_size)])
67    if block_transfer:
68        args.append('--block_transfer')
69
70    mock = cpd.return_value
71    mock.flash_status.return_value = 0
72    mock.wait_for_flash_status_ok.return_value = 0
73    mock.swid.return_value = 0
74
75    parser = argparse.ArgumentParser(allow_abbrev=False)
76    CANopenBinaryRunner.add_parser(parser)
77    arg_namespace = parser.parse_args(args)
78    runner = CANopenBinaryRunner.create(runner_config, arg_namespace)
79    with patch('os.path.isfile', side_effect=os_path_isfile_patch):
80        runner.run('flash')
81
82    cpd.assert_called_once()
83    if context:
84        assert cpd.call_args == call(node_id=node_id,
85                                     can_context=context,
86                                     logger=runner.logger,
87                                     program_number=program_number,
88                                     sdo_retries=sdo_retries,
89                                     sdo_timeout=sdo_timeout,
90                                     download_buffer_size=download_buffer_size,
91                                     block_transfer=block_transfer)
92    else:
93        assert cpd.call_args == call(node_id=node_id,
94                                     can_context=TEST_DEF_CONTEXT,
95                                     logger=runner.logger,
96                                     program_number=program_number,
97                                     sdo_retries=sdo_retries,
98                                     sdo_timeout=sdo_timeout,
99                                     download_buffer_size=download_buffer_size,
100                                     block_transfer=block_transfer)
101
102    mock.connect.assert_called_once()
103
104    if confirm_only:
105        mock.wait_for_flash_status_ok.assert_called_with(timeout)
106        mock.swid.assert_called_once()
107        mock.enter_pre_operational.assert_called_once()
108        mock.zephyr_confirm_program.assert_called_once()
109        mock.clear_program.assert_not_called()
110        mock.stop_program.assert_not_called()
111        mock.download.assert_not_called()
112        mock.start_program.assert_not_called()
113        mock.wait_for_bootup.assert_not_called()
114    else:
115        mock.enter_pre_operational.assert_called()
116        mock.wait_for_flash_status_ok.assert_called_with(timeout)
117        mock.swid.assert_called()
118        mock.stop_program.assert_called_once()
119        mock.clear_program.assert_called_once()
120        mock.download.assert_called_once_with(RC_KERNEL_BIN)
121        mock.start_program.assert_called_once()
122        mock.wait_for_bootup.assert_called_once_with(timeout)
123        if confirm:
124            mock.zephyr_confirm_program.assert_called_once()
125        else:
126            mock.zephyr_confirm_program.assert_not_called()
127
128    mock.disconnect.assert_called_once()
129