1# Copyright (c) 2020 Vestas Wind Systems A/S 2# 3# SPDX-License-Identifier: Apache-2.0 4 5import argparse 6import os 7from unittest.mock import patch, call 8 9import pytest 10 11from runners.canopen_program import CANopenBinaryRunner 12from conftest import RC_KERNEL_BIN 13 14# 15# Test values 16# 17 18TEST_DEF_CONTEXT = 'default' 19TEST_ALT_CONTEXT = 'alternate' 20 21# 22# Test cases 23# 24 25TEST_CASES = [(n, x, p, c, o, t, r, s, d, b) 26 for n in range(1, 3) 27 for x in (None, TEST_ALT_CONTEXT) 28 for p in range(1, 3) 29 for c in (False, True) 30 for o in (False, True) 31 for t in range(1, 3) 32 for r in range(1, 3) 33 for s in range(1, 3) 34 for d in [256, 1024] 35 for b in range(False, True)] 36 37os_path_isfile = os.path.isfile 38 39def os_path_isfile_patch(filename): 40 if filename == RC_KERNEL_BIN: 41 return True 42 return os_path_isfile(filename) 43 44@pytest.mark.parametrize('test_case', TEST_CASES) 45@patch('runners.canopen_program.CANopenProgramDownloader') 46def test_canopen_program_create(cpd, test_case, runner_config): 47 '''Test CANopen runner created from command line parameters.''' 48 node_id, context, program_number, confirm, confirm_only, timeout, sdo_retries, sdo_timeout, download_buffer_size, block_transfer = test_case 49 50 args = ['--node-id', str(node_id)] 51 if context is not None: 52 args.extend(['--can-context', context]) 53 if program_number: 54 args.extend(['--program-number', str(program_number)]) 55 if not confirm: 56 args.append('--no-confirm') 57 if confirm_only: 58 args.append('--confirm-only') 59 if timeout: 60 args.extend(['--timeout', str(timeout)]) 61 if sdo_retries: 62 args.extend(['--sdo-retries', str(sdo_retries)]) 63 if sdo_timeout: 64 args.extend(['--sdo-timeout', str(sdo_timeout)]) 65 if download_buffer_size: 66 args.extend(['--download-buffer-size', str(download_buffer_size)]) 67 if block_transfer: 68 args.append('--block_transfer') 69 70 mock = cpd.return_value 71 mock.flash_status.return_value = 0 72 mock.wait_for_flash_status_ok.return_value = 0 73 mock.swid.return_value = 0 74 75 parser = argparse.ArgumentParser(allow_abbrev=False) 76 CANopenBinaryRunner.add_parser(parser) 77 arg_namespace = parser.parse_args(args) 78 runner = CANopenBinaryRunner.create(runner_config, arg_namespace) 79 with patch('os.path.isfile', side_effect=os_path_isfile_patch): 80 runner.run('flash') 81 82 cpd.assert_called_once() 83 if context: 84 assert cpd.call_args == call(node_id=node_id, 85 can_context=context, 86 logger=runner.logger, 87 program_number=program_number, 88 sdo_retries=sdo_retries, 89 sdo_timeout=sdo_timeout, 90 download_buffer_size=download_buffer_size, 91 block_transfer=block_transfer) 92 else: 93 assert cpd.call_args == call(node_id=node_id, 94 can_context=TEST_DEF_CONTEXT, 95 logger=runner.logger, 96 program_number=program_number, 97 sdo_retries=sdo_retries, 98 sdo_timeout=sdo_timeout, 99 download_buffer_size=download_buffer_size, 100 block_transfer=block_transfer) 101 102 mock.connect.assert_called_once() 103 104 if confirm_only: 105 mock.wait_for_flash_status_ok.assert_called_with(timeout) 106 mock.swid.assert_called_once() 107 mock.enter_pre_operational.assert_called_once() 108 mock.zephyr_confirm_program.assert_called_once() 109 mock.clear_program.assert_not_called() 110 mock.stop_program.assert_not_called() 111 mock.download.assert_not_called() 112 mock.start_program.assert_not_called() 113 mock.wait_for_bootup.assert_not_called() 114 else: 115 mock.enter_pre_operational.assert_called() 116 mock.wait_for_flash_status_ok.assert_called_with(timeout) 117 mock.swid.assert_called() 118 mock.stop_program.assert_called_once() 119 mock.clear_program.assert_called_once() 120 mock.download.assert_called_once_with(RC_KERNEL_BIN) 121 mock.start_program.assert_called_once() 122 mock.wait_for_bootup.assert_called_once_with(timeout) 123 if confirm: 124 mock.zephyr_confirm_program.assert_called_once() 125 else: 126 mock.zephyr_confirm_program.assert_not_called() 127 128 mock.disconnect.assert_called_once() 129