1# SPDX-FileCopyrightText: Copyright 2024 Arm Limited and/or its affiliates <open-source-office@arm.com>
2#
3# SPDX-License-Identifier: Apache-2.0
4#
5# Licensed under the Apache License, Version 2.0 (the License); you may
6# not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an AS IS BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17import Lib.op_utils
18import math
19
20from tensorflow.lite.python.interpreter import Interpreter
21from tensorflow.lite.python.interpreter import OpResolverType
22import keras
23import numpy as np
24
25def generate_data(tflite_fname, params):
26    tensors = {}
27    effective_scales = {}
28    scales = {}
29    generated_params = {}
30    aliases = {}
31
32    # To be removed
33    aliases["output_multiplier"] = "output_mult"
34    aliases["bias"] = "biases"
35    aliases["output"] = "output_ref"
36
37    if params["tflite_generator"] == "json":
38        import tflite_micro # Only tflite_micro interpreter supports int4 convolution
39
40        interpreter = tflite_micro.runtime.Interpreter.from_file(
41            model_path=str(tflite_fname), arena_size=params["arena_size"],
42            intrepreter_config=tflite_micro.runtime.InterpreterConfig.kPreserveAllTensors)
43
44        x_output = interpreter.get_output_details(0)["shape"][2]
45        y_output = interpreter.get_output_details(0)["shape"][1]
46
47        scales["input_scale"] = interpreter.get_input_details(0)["quantization_parameters"]["scales"][0]
48        scales["input_zero_point"] = interpreter.get_input_details(0)["quantization_parameters"]["zero_points"][0]
49        scales["output_scale"] = interpreter.get_output_details(0)["quantization_parameters"]["scales"][0]
50        scales["output_zero_point"] = interpreter.get_output_details(0)["quantization_parameters"]["zero_points"][0]
51
52        scales["scaling_factors"] = params["w_scale"]
53    else:
54
55        interpreter = Interpreter(str(tflite_fname), experimental_op_resolver_type=OpResolverType.BUILTIN_REF)
56        interpreter.allocate_tensors()
57        tensor_details = interpreter.get_tensor_details()
58
59        if params["generate_bias"]:
60            filter_index = 1
61            bias_index = 2
62        else:
63            filter_index = 2
64            bias_index = 1
65
66        filter_layer = tensor_details[filter_index]
67        scales["scaling_factors"] = filter_layer['quantization_parameters']['scales']
68
69        if params["generate_bias"]:
70            bias_layer = tensor_details[bias_index]
71        else:
72            bias_layer = None
73
74        input_details = interpreter.get_input_details()
75        (scales["input_scale"], scales["input_zero_point"]) = input_details[0]['quantization']
76
77        output_details = interpreter.get_output_details()
78        (scales["output_scale"], scales["output_zero_point"]) = output_details[0]['quantization']
79
80        x_output = output_details[0]['shape'][2]
81        y_output = output_details[0]['shape'][1]
82
83        tensors["weights"] = interpreter.get_tensor(filter_layer['index'])
84        if params["generate_bias"]:
85            tensors["bias"] = interpreter.get_tensor(bias_layer['index'])
86        else:
87            tensors["bias"] = None
88
89    def calculate_padding(x_output, y_output, params):
90        x_input = params["input_w"]
91        y_input = params["input_h"]
92
93        if params["padding"] == "SAME":
94            # Take dilation into account.
95            filter_x = (params["filter_x"] - 1) * params["dilation_x"] + 1
96            filter_y = (params["filter_y"] - 1) * params["dilation_y"] + 1
97
98            pad_along_width = max((x_output - 1) * params["stride_x"] + filter_x - x_input, 0)
99            pad_along_height = max((y_output - 1) * params["stride_y"] + filter_y - y_input, 0)
100
101            pad_top = pad_along_height // 2
102
103            pad_left = pad_along_width // 2
104            pad_top_offset = pad_along_height % 2
105            pad_left_offset = pad_along_width % 2
106
107            pad_y_with_offset = pad_top + pad_top_offset
108            pad_x_with_offset = pad_left + pad_left_offset
109            pad_x = pad_left
110            pad_y = pad_top
111        else:
112            pad_x = 0
113            pad_y = 0
114            pad_y_with_offset = 0
115            pad_x_with_offset = 0
116
117        return pad_y_with_offset, pad_x_with_offset, pad_y, pad_x
118
119    pad_y_with_offset, pad_x_with_offset, pad_y, pad_x = calculate_padding(x_output, y_output, params)
120
121    generated_params["input_batches"] = params["batch_size"]
122    generated_params["pad_x"] = pad_x
123    generated_params["pad_y"] = pad_y
124    generated_params["output_h"] = y_output
125    generated_params["output_w"] = x_output
126    generated_params["dst_size"] = x_output * y_output * params["out_ch"] * params["batch_size"]
127    generated_params["input_offset"] = -scales["input_zero_point"]
128    generated_params["output_offset"] = scales["output_zero_point"]
129
130    per_channel_multiplier, per_channel_shift = Lib.op_utils.generate_quantize_per_channel_multiplier(params, scales)
131
132    tensors["output_multiplier"] = np.array(per_channel_multiplier)
133    tensors["output_shift"] = np.array(per_channel_shift)
134
135    return Lib.op_utils.Generated_data(generated_params, tensors, scales, effective_scales, aliases)
136
137
138class Op_conv(Lib.op_utils.Op_type):
139
140    def get_shapes(params):
141        shapes = {}
142
143        # Common default parameters
144        params["stride_x"] = 1 if "stride_x" not in params else params["stride_x"]
145        params["stride_y"] = 1 if "stride_y" not in params else params["stride_y"]
146        params["dilation_x"] = 1 if "dilation_x" not in params else params["dilation_x"]
147        params["dilation_y"] = 1 if "dilation_y" not in params else params["dilation_y"]
148        params["batch_size"] = 1 if "batch_size" not in params else params["batch_size"]
149        params["generate_bias"] = True if "generate_bias" not in params else params["generate_bias"]
150        if "out_activation_min" not in params:
151            params["out_activation_min"] = Lib.op_utils.get_dtype_min(params["input_data_type"])
152        if "out_activation_max" not in params:
153            params["out_activation_max"] = Lib.op_utils.get_dtype_max(params["input_data_type"])
154        if "bias_min" not in params:
155            params["bias_min"] = Lib.op_utils.get_dtype_min("int32_t")
156        if "bias_max" not in params:
157            params["bias_max"] = Lib.op_utils.get_dtype_max("int32_t")
158        if "weights_min" not in params:
159            params["weights_min"] = Lib.op_utils.get_dtype_min("int32_t")
160        if "weights_max" not in params:
161            params["weights_max"] = Lib.op_utils.get_dtype_max("int32_t")
162
163        in_ch = params["in_ch"]
164        out_ch = params["out_ch"]
165        groups = params["groups"]
166        filter_ch = in_ch // groups
167
168        if in_ch % groups != 0:
169            raise RuntimeError("ERROR: Input channels {} must be an even multiple of groups {}".format(in_ch, groups))
170        if out_ch % groups != 0:
171            raise RuntimeError("ERROR: Output channels {} must be an even multiple of groups {}".format(out_ch, groups))
172
173        shapes["input_tensor"] = (params["batch_size"], params["input_h"], params["input_w"], in_ch)
174        shapes["weight_shape"] = [params["filter_y"], params["filter_x"], filter_ch, out_ch]
175
176        if params["tflite_generator"] == "json":
177            params["json_template"] = "conv.json" if params["generate_bias"] else "conv_null_bias.json"
178        else:
179            shapes["representational_dataset"] = (params["batch_size"], params["input_h"], params["input_w"], in_ch)
180
181        if params["generate_bias"]:
182            shapes["bias_shape"] = [out_ch]
183        else:
184            shapes["bias_shape"] = []
185
186        return shapes
187
188    def generate_keras_model(shapes, params):
189
190        model = keras.models.Sequential()
191        input_shape = (params["batch_size"], params["input_h"], params["input_w"], params["in_ch"])
192        model.add(keras.layers.InputLayer(input_shape=input_shape[1:], batch_size=params["batch_size"]))
193
194        conv_layer = keras.layers.Conv2D(params["out_ch"],
195                                         kernel_size=(params["filter_y"], params["filter_x"]),
196                                         strides=(params["stride_y"], params["stride_x"]),
197                                         padding=params["padding"],
198                                         input_shape=input_shape[1:],
199                                         dilation_rate=(params["dilation_y"], params["dilation_x"]),
200                                         groups=params["groups"],
201                                         use_bias=params["generate_bias"])
202        model.add(conv_layer)
203
204        weights = Lib.op_utils.generate_tf_tensor(
205            shapes["weight_shape"], params["weights_min"], params["weights_max"], decimals=8)
206
207        if params["generate_bias"]:
208            bias = Lib.op_utils.generate_tf_tensor(
209                shapes["bias_shape"], params["bias_min"], params["bias_max"])
210            conv_layer.set_weights([weights, bias])
211        else:
212            conv_layer.set_weights([weights])
213
214        return model
215
216    def generate_data_tflite(tflite_fname, params):
217        return generate_data(tflite_fname, params)
218
219    def post_model_update(tflite_path, generated_data, params):
220
221        data = generate_data(tflite_path, params)
222
223        generated_data.params |= data.params
224        generated_data.aliases |= data.aliases
225        generated_data.tensors |= data.tensors
226
227        return generated_data
228
229    def generate_data_json(shapes, params):
230
231        if params["weights_data_type"] != "int4_t":
232            raise RuntimeError("Only int4 weights support json generated models")
233
234        tensors = {}
235        effective_scales = {}
236        scales = {}
237        generated_params = {}
238        aliases = {}
239
240        generated_params["input_batches"] = params["batch_size"]
241        generated_params["dst_size"] = params["out_ch"] * params["batch_size"]
242
243        def quantize_float_data(data=None, quantization_bit_range=8, quantization_type="affine", tf_tensor=False):
244            if data is None:
245                return
246
247            if tf_tensor:
248                data = data.numpy()
249            data_max = np.amax(data)
250            data_min = np.amin(data)
251
252            if quantization_type.lower() == "affine":
253                data_min = min(data_min, 0.0)
254                data_max = max(data_max, 0.0)
255
256                scale = (data_max - data_min) / (pow(2, quantization_bit_range) - 1)
257                zero_point = -(round(data_max * scale)) - pow(2, quantization_bit_range - 1)
258                zero_point = max(zero_point, pow(quantization_bit_range - 1) - 1)
259                zero_point = min(zero_point, -pow(quantization_bit_range - 1))
260
261            elif quantization_type.lower() == "symmetric":
262                absolute_max = max(abs(data_min), abs(data_max))
263                scale = absolute_max / (pow(2, quantization_bit_range - 1) - 1)
264                zero_point = 0
265
266            else:
267                raise RuntimeError("Quantization scheme not supported")
268
269            scale = 0.1 if scale == 0 else scale
270            quantized_data = [(x // scale) + zero_point for x in data]
271            return np.array(quantized_data), scale, zero_point
272
273        if params["generate_bias"]:
274            quant_bias, bias_scale, bias_zp = quantize_float_data(
275                np.random.randint(
276                    params["bias_min"],
277                    params["bias_max"],
278                    size=shapes["bias_shape"]),
279                quantization_bit_range=8,
280                quantization_type="symmetric",
281                tf_tensor=(not params["generate_bias"]))
282
283            params["bias_scale"] = [bias_scale] * params["out_ch"]
284            params["bias_zp"] = [bias_zp] * params["out_ch"]
285
286            tensors["input_bias"] = quant_bias
287        else:
288            tensors["input_bias"] = None
289
290        params["w_zp"] = [0] * params["out_ch"]
291        params["w_scale"] = np.random.uniform(0.001, 0.01, size=[params["out_ch"]]).tolist()
292
293        params["output_scale"] = np.random.uniform(0.02, 0.06)
294
295        if params["padding"] == "SAME":
296            # TODO dilation with padding
297            output_x = math.ceil(float(params["input_w"]) / float(params["stride_x"]))
298            output_y = math.ceil(float(params["input_h"]) / float(params["stride_y"]))
299        else:
300            dilation_filter_x = (params["filter_x"] - 1) * (params["dilation_x"] - 1)
301            dilation_filter_y = (params["filter_y"] - 1) * (params["dilation_y"] - 1)
302
303            output_x = math.ceil(
304                float(params["input_w"] - params["filter_x"] - dilation_filter_x + 1) / float(params["stride_x"]))
305            output_y = math.ceil(
306                float(params["input_h"] - params["filter_y"] - dilation_filter_y + 1) / float(params["stride_y"]))
307        generated_params["output_h"] = output_y
308        generated_params["output_w"] = output_x
309
310        generated_params["input_offset"] = -params["input_zp"]
311        generated_params["output_offset"] = params["output_zp"]
312
313        aliases["input_bias"] = "biases"
314        aliases["input_weights"] = "weights"
315
316        weights = np.random.randint(
317            params["weights_min"], params["weights_max"], size=shapes["weight_shape"])
318
319        uneven = weights.size % 2
320        if uneven:
321            weights = np.append(weights, 0)
322
323        temp = np.reshape(weights, (weights.size // 2, 2)).astype(np.uint8)
324        weights = 0xff & ((0xf0 & (temp[:, 1] << 4)) | (temp[:, 0] & 0xf))
325        tensors["input_weights"] = weights
326
327        return Lib.op_utils.Generated_data(generated_params, tensors, scales, effective_scales, aliases)
328