1# SPDX-FileCopyrightText: Copyright 2024 Arm Limited and/or its affiliates <open-source-office@arm.com>
2#
3# SPDX-License-Identifier: Apache-2.0
4#
5# Licensed under the Apache License, Version 2.0 (the License); you may
6# not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an AS IS BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17import Lib.op_utils
18import math
19
20from tensorflow.lite.python.interpreter import Interpreter
21from tensorflow.lite.python.interpreter import OpResolverType
22import tf_keras as keras
23import numpy as np
24
25
26class Op_conv(Lib.op_utils.Op_type):
27
28    def get_shapes(params):
29        shapes = {}
30
31        # Common default parameters
32        params["stride_x"] = 1 if "stride_x" not in params else params["stride_x"]
33        params["stride_y"] = 1 if "stride_y" not in params else params["stride_y"]
34        params["dilation_x"] = 1 if "dilation_x" not in params else params["dilation_x"]
35        params["dilation_y"] = 1 if "dilation_y" not in params else params["dilation_y"]
36        params["batch_size"] = 1 if "batch_size" not in params else params["batch_size"]
37        params["generate_bias"] = True if "generate_bias" not in params else params["generate_bias"]
38        if "out_activation_min" not in params:
39            params["out_activation_min"] = Lib.op_utils.get_dtype_min(params["input_data_type"])
40        if "out_activation_max" not in params:
41            params["out_activation_max"] = Lib.op_utils.get_dtype_max(params["input_data_type"])
42        if "bias_min" not in params:
43            params["bias_min"] = Lib.op_utils.get_dtype_min("int32_t")
44        if "bias_max" not in params:
45            params["bias_max"] = Lib.op_utils.get_dtype_max("int32_t")
46        if "weights_min" not in params:
47            params["weights_min"] = Lib.op_utils.get_dtype_min("int32_t")
48        if "weights_max" not in params:
49            params["weights_max"] = Lib.op_utils.get_dtype_max("int32_t")
50
51        in_ch = params["in_ch"]
52        out_ch = params["out_ch"]
53        groups = params["groups"]
54        filter_ch = in_ch // groups
55
56        if in_ch % groups != 0:
57            raise RuntimeError("ERROR: Input channels {} must be an even multiple of groups {}".format(in_ch, groups))
58        if out_ch % groups != 0:
59            raise RuntimeError("ERROR: Output channels {} must be an even multiple of groups {}".format(out_ch, groups))
60
61        shapes["input"] = (params["batch_size"], params["input_h"], params["input_w"], in_ch)
62        shapes["weight_shape"] = [params["filter_y"], params["filter_x"], filter_ch, out_ch]
63
64        if params["generate_bias"]:
65            shapes["bias_shape"] = [out_ch]
66        else:
67            shapes["bias_shape"] = []
68
69        shapes["representational_dataset"] = (params["batch_size"], params["input_h"], params["input_w"], in_ch)
70        return shapes
71
72    def generate_keras_model(shapes, params):
73
74        model = keras.models.Sequential()
75        input_shape = (params["batch_size"], params["input_h"], params["input_w"], params["in_ch"])
76        model.add(keras.layers.InputLayer(input_shape=input_shape[1:], batch_size=params["batch_size"]))
77
78        conv_layer = keras.layers.Conv2D(params["out_ch"],
79                                         kernel_size=(params["filter_y"], params["filter_x"]),
80                                         strides=(params["stride_y"], params["stride_x"]),
81                                         padding=params["padding"],
82                                         input_shape=input_shape[1:],
83                                         dilation_rate=(params["dilation_y"], params["dilation_x"]),
84                                         groups=params["groups"],
85                                         use_bias=params["generate_bias"])
86        model.add(conv_layer)
87
88        weights = Lib.op_utils.generate_tf_tensor(
89            shapes["weight_shape"], params["weights_min"], params["weights_max"], decimals=8)
90
91        if params["generate_bias"]:
92            bias = Lib.op_utils.generate_tf_tensor(
93                shapes["bias_shape"], params["bias_min"], params["bias_max"])
94            conv_layer.set_weights([weights, bias])
95        else:
96            conv_layer.set_weights([weights])
97
98        return model
99
100    def generate_data_tflite(tflite_fname, params):
101        tensors = {}
102        effective_scales = {}
103        scales = {}
104        generated_params = {}
105        aliases = {}
106
107        # To be removed
108        aliases["output_multiplier"] = "output_mult"
109        aliases["bias"] = "biases"
110        aliases["output"] = "output_ref"
111
112        interpreter = Interpreter(str(tflite_fname), experimental_op_resolver_type=OpResolverType.BUILTIN_REF)
113        interpreter.allocate_tensors()
114        tensor_details = interpreter.get_tensor_details()
115        input_state = tensor_details[0]
116
117        if params["generate_bias"]:
118            filter_index = 1
119            bias_index = 2
120        else:
121            filter_index = 2
122            bias_index = 1
123
124        filter_layer = tensor_details[filter_index]
125
126        if params["generate_bias"]:
127            bias_layer = tensor_details[bias_index]
128        else:
129            bias_layer = None
130
131        input_details = interpreter.get_input_details()
132        (scales["input_scale"], scales["input_zero_point"]) = input_details[0]['quantization']
133
134        output_details = interpreter.get_output_details()
135        (scales["output_scale"], scales["output_zero_point"]) = output_details[0]['quantization']
136
137        x_output = output_details[0]['shape'][2]
138        y_output = output_details[0]['shape'][1]
139
140        def calculate_padding(x_output, y_output, params):
141            x_input = params["input_w"]
142            y_input = params["input_h"]
143
144            if params["padding"] == "SAME":
145                # Take dilation into account.
146                filter_x = (params["filter_x"] - 1) * params["dilation_x"] + 1
147                filter_y = (params["filter_y"] - 1) * params["dilation_y"] + 1
148
149                pad_along_width = max((x_output - 1) * params["stride_x"] + filter_x - x_input, 0)
150                pad_along_height = max((y_output - 1) * params["stride_y"] + filter_y - y_input, 0)
151
152                pad_top = pad_along_height // 2
153                pad_left = pad_along_width // 2
154                pad_top_offset = pad_along_height % 2
155                pad_left_offset = pad_along_width % 2
156
157                pad_y_with_offset = pad_top + pad_top_offset
158                pad_x_with_offset = pad_left + pad_left_offset
159                pad_x = pad_left
160                pad_y = pad_top
161            else:
162                pad_x = 0
163                pad_y = 0
164                pad_y_with_offset = 0
165                pad_x_with_offset = 0
166
167            return pad_y_with_offset, pad_x_with_offset, pad_y, pad_x
168
169        pad_y_with_offset, pad_x_with_offset, pad_y, pad_x = calculate_padding(x_output, y_output, params)
170
171        tensors["weights"] = interpreter.get_tensor(filter_layer['index'])
172
173        if params["generate_bias"]:
174            tensors["bias"] = interpreter.get_tensor(bias_layer['index'])
175        else:
176            tensors["bias"] = None
177
178        scales["scaling_factors"] = filter_layer['quantization_parameters']['scales']
179
180        def generate_quantize_per_channel_multiplier(params, scales):
181            def quantize_scale(scale):
182                significand, shift = math.frexp(scale)
183                significand_q31 = round(significand * (1 << 31))
184                return significand_q31, shift
185
186            num_channels = params["out_ch"]
187            per_channel_multiplier = []
188            per_channel_shift = []
189
190            if len(scales["scaling_factors"]) != num_channels:
191                raise RuntimeError("Missing scaling factors")
192
193            for i in range(num_channels):
194                effective_output_scale = scales["input_scale"] * scales["scaling_factors"][i] / scales["output_scale"]
195                (quantized_multiplier, shift) = quantize_scale(effective_output_scale)
196
197                per_channel_multiplier.append(quantized_multiplier)
198                per_channel_shift.append(shift)
199
200            return per_channel_multiplier, per_channel_shift
201
202        generated_params["input_batches"] = params["batch_size"]
203        generated_params["pad_x"] = pad_x
204        generated_params["pad_y"] = pad_y
205        generated_params["output_h"] = y_output
206        generated_params["output_w"] = x_output
207        generated_params["dst_size"] = x_output * y_output * params["out_ch"] * params["batch_size"]
208        generated_params["input_offset"] = -input_state['quantization_parameters']['zero_points'][0]
209        generated_params["output_offset"] = output_details[0]['quantization'][1]
210
211        per_channel_multiplier, per_channel_shift = generate_quantize_per_channel_multiplier(params, scales)
212
213        tensors["output_multiplier"] = np.array(per_channel_multiplier)
214        tensors["output_shift"] = np.array(per_channel_shift)
215
216        return Lib.op_utils.Generated_data(generated_params, tensors, scales, effective_scales, aliases)
217