1# SPDX-FileCopyrightText: Copyright 2010-2024 Arm Limited and/or its affiliates <open-source-office@arm.com>
2#
3# SPDX-License-Identifier: Apache-2.0
4#
5# Licensed under the Apache License, Version 2.0 (the License); you may
6# not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an AS IS BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17import os
18import sys
19import json
20import math
21import subprocess
22import keras
23
24from abc import ABC, abstractmethod
25from packaging import version
26
27import numpy as np
28import tensorflow as tf
29import tf_keras as keras
30
31class TestSettings(ABC):
32
33    # This is the generated test data used by the test cases.
34    OUTDIR = 'TestCases/TestData/'
35
36    # This is input to the data generation. If everything or something is regenerated then it is overwritten.
37    # So it always has the same data as the OUTDIR.
38    # The purpose of the pregen is primarily for debugging, as it is enabling to change a single parameter and see how
39    # output changes (or not changes), without regenerating all input data.
40    # It also convinient when testing changes in the script, to be able to run all test sets again.
41    PREGEN = 'PregeneratedData/'
42
43    INT32_MAX = 2147483647
44    INT32_MIN = -2147483648
45    INT64_MAX = 9223372036854775807
46    INT64_MIN = -9223372036854775808
47    INT16_MAX = 32767
48    INT16_MIN = -32768
49    INT8_MAX = 127
50    INT8_MIN = -128
51    INT4_MAX = 7
52    INT4_MIN = -8
53
54    REQUIRED_MINIMUM_TENSORFLOW_VERSION = version.parse("2.10")
55
56    CLANG_FORMAT = 'clang-format-12 -i'  # For formatting generated headers.
57
58    def __init__(self,
59                 dataset,
60                 testtype,
61                 regenerate_weights,
62                 regenerate_input,
63                 regenerate_biases,
64                 schema_file,
65                 in_ch,
66                 out_ch,
67                 x_in,
68                 y_in,
69                 w_x,
70                 w_y,
71                 stride_x=1,
72                 stride_y=1,
73                 pad=False,
74                 randmin=np.iinfo(np.dtype('int8')).min,
75                 randmax=np.iinfo(np.dtype('int8')).max,
76                 batches=1,
77                 generate_bias=True,
78                 relu6=False,
79                 out_activation_min=None,
80                 out_activation_max=None,
81                 int16xint8=False,
82                 bias_min=np.iinfo(np.dtype('int32')).min,
83                 bias_max=np.iinfo(np.dtype('int32')).max,
84                 dilation_x=1,
85                 dilation_y=1,
86                 interpreter="tensorflow",
87                 int4_weights=False):
88
89        self.int4_weights = int4_weights
90
91        if self.INT8_MIN != np.iinfo(np.dtype('int8')).min or self.INT8_MAX != np.iinfo(np.dtype('int8')).max or \
92           self.INT16_MIN != np.iinfo(np.dtype('int16')).min or self.INT16_MAX != np.iinfo(np.dtype('int16')).max or \
93           self.INT32_MIN != np.iinfo(np.dtype('int32')).min or self.INT32_MAX != np.iinfo(np.dtype('int32')).max:
94            raise RuntimeError("Unexpected int min/max error")
95
96        self.use_tflite_micro_interpreter = False
97
98        if interpreter == "tflite_runtime":
99            from tflite_runtime.interpreter import Interpreter
100            from tflite_runtime.interpreter import OpResolverType
101            import tflite_runtime as tfl_runtime
102
103            revision = tfl_runtime.__git_version__
104            version = tfl_runtime.__version__
105            interpreter = "tflite_runtime"
106
107        elif interpreter == "tensorflow":
108            from tensorflow.lite.python.interpreter import Interpreter
109            from tensorflow.lite.python.interpreter import OpResolverType
110
111            revision = tf.__git_version__
112            version = tf.__version__
113            interpreter = "tensorflow"
114
115        elif interpreter == "tflite_micro":
116            from tensorflow.lite.python.interpreter import Interpreter
117            from tensorflow.lite.python.interpreter import OpResolverType
118
119            import tflite_micro
120            self.tflite_micro = tflite_micro
121            self.use_tflite_micro_interpreter = True
122
123            revision = None
124            version = tflite_micro.__version__
125            interpreter = "tflite_micro"
126        else:
127            raise RuntimeError(f"Invalid interpreter {interpreter}")
128
129        self.Interpreter = Interpreter
130        self.OpResolverType = OpResolverType
131
132        self.tensorflow_reference_version = (
133            "// Generated by {} using tensorflow version {} (Keras version {}).\n".format(
134                os.path.basename(__file__), tf.__version__, keras.__version__))
135
136        self.tensorflow_reference_version += ("// Interpreter from {} version {} and revision {}.\n".format(
137            interpreter, version, revision))
138
139        # Randomization interval
140        self.mins = randmin
141        self.maxs = randmax
142
143        self.bias_mins = bias_min
144        self.bias_maxs = bias_max
145
146        self.input_ch = in_ch
147        self.output_ch = out_ch
148        self.x_input = x_in
149        self.y_input = y_in
150        self.filter_x = w_x
151        self.filter_y = w_y
152        self.stride_x = stride_x
153        self.stride_y = stride_y
154        self.dilation_x = dilation_x
155        self.dilation_y = dilation_y
156        self.batches = batches
157        self.test_type = testtype
158        self.has_padding = pad
159
160        self.is_int16xint8 = int16xint8
161
162        if relu6:
163            self.out_activation_max = 6
164            self.out_activation_min = 0
165        else:
166            if out_activation_min is not None:
167                self.out_activation_min = out_activation_min
168            else:
169                self.out_activation_min = self.INT16_MIN if self.is_int16xint8 else self.INT8_MIN
170            if out_activation_max is not None:
171                self.out_activation_max = out_activation_max
172            else:
173                self.out_activation_max = self.INT16_MAX if self.is_int16xint8 else self.INT8_MAX
174
175        # Bias is optional.
176        self.generate_bias = generate_bias
177
178        self.generated_header_files = []
179        self.pregenerated_data_dir = self.PREGEN
180
181        self.config_data = "config_data.h"
182
183        self.testdataset = dataset
184
185        self.kernel_table_file = self.pregenerated_data_dir + self.testdataset + '/' + 'kernel.txt'
186        self.inputs_table_file = self.pregenerated_data_dir + self.testdataset + '/' + 'input.txt'
187        self.bias_table_file = self.pregenerated_data_dir + self.testdataset + '/' + 'bias.txt'
188
189        if self.has_padding:
190            self.padding = 'SAME'
191        else:
192            self.padding = 'VALID'
193
194        self.regenerate_new_weights = regenerate_weights
195        self.regenerate_new_input = regenerate_input
196        self.regenerate_new_bias = regenerate_biases
197        self.schema_file = schema_file
198
199        self.headers_dir = self.OUTDIR + self.testdataset + '/'
200        os.makedirs(self.headers_dir, exist_ok=True)
201
202        self.model_path = "{}model_{}".format(self.headers_dir, self.testdataset)
203        self.model_path_tflite = self.model_path + '.tflite'
204
205        self.input_data_file_prefix = "input"
206        self.weight_data_file_prefix = "weights"
207        self.bias_data_file_prefix = "biases"
208        self.output_data_file_prefix = "output_ref"
209
210    def save_multiple_dim_array_in_txt(self, file, data):
211        header = ','.join(map(str, data.shape))
212        np.savetxt(file, data.reshape(-1, data.shape[-1]), header=header, delimiter=',')
213
214    def load_multiple_dim_array_from_txt(self, file):
215        with open(file) as f:
216            shape = list(map(int, next(f)[1:].split(',')))
217            data = np.genfromtxt(f, delimiter=',').reshape(shape)
218        return data.astype(np.float32)
219
220    def convert_tensor_np(self, tensor_in, converter, *qminmax):
221        w = tensor_in.numpy()
222        shape = w.shape
223        w = w.ravel()
224        if len(qminmax) == 2:
225            fw = converter(w, qminmax[0], qminmax[1])
226        else:
227            fw = converter(w)
228        fw.shape = shape
229        return tf.convert_to_tensor(fw)
230
231    def convert_tensor(self, tensor_in, converter, *qminmax):
232        w = tensor_in.numpy()
233        shape = w.shape
234        w = w.ravel()
235        normal = np.array(w)
236        float_normal = []
237
238        for i in normal:
239            if len(qminmax) == 2:
240                float_normal.append(converter(i, qminmax[0], qminmax[1]))
241            else:
242                float_normal.append(converter(i))
243
244        np_float_array = np.asarray(float_normal)
245        np_float_array.shape = shape
246
247        return tf.convert_to_tensor(np_float_array)
248
249    def get_randomized_data(self, dims, npfile, regenerate, decimals=0, minrange=None, maxrange=None):
250        if not minrange:
251            minrange = self.mins
252        if not maxrange:
253            maxrange = self.maxs
254        if not os.path.exists(npfile) or regenerate:
255            regendir = os.path.dirname(npfile)
256            os.makedirs(regendir, exist_ok=True)
257            if decimals == 0:
258                data = tf.Variable(tf.random.uniform(dims, minval=minrange, maxval=maxrange, dtype=tf.dtypes.int64))
259                data = tf.cast(data, dtype=tf.float32)
260            else:
261                data = tf.Variable(tf.random.uniform(dims, minval=minrange, maxval=maxrange, dtype=tf.dtypes.float32))
262                data = np.around(data.numpy(), decimals)
263                data = tf.convert_to_tensor(data)
264
265            print("Saving data to {}".format(npfile))
266            self.save_multiple_dim_array_in_txt(npfile, data.numpy())
267        else:
268            print("Loading data from {}".format(npfile))
269            data = tf.convert_to_tensor(self.load_multiple_dim_array_from_txt(npfile))
270        return data
271
272    def get_randomized_input_data(self, input_data, input_shape=None):
273        # Generate or load saved input data unless hardcoded data provided
274        if input_shape is None:
275            input_shape = [self.batches, self.y_input, self.x_input, self.input_ch]
276        if input_data is not None:
277            input_data = tf.reshape(input_data, input_shape)
278        else:
279            input_data = self.get_randomized_data(input_shape,
280                                                  self.inputs_table_file,
281                                                  regenerate=self.regenerate_new_input)
282        return input_data
283
284    def get_randomized_bias_data(self, biases):
285        # Generate or load saved bias data unless hardcoded data provided
286        if not self.generate_bias:
287            biases = tf.reshape(np.full([self.output_ch], 0), [self.output_ch])
288        elif biases is not None:
289            biases = tf.reshape(biases, [self.output_ch])
290        else:
291            biases = self.get_randomized_data([self.output_ch],
292                                              self.bias_table_file,
293                                              regenerate=self.regenerate_new_bias,
294                                              minrange=self.bias_mins,
295                                              maxrange=self.bias_maxs)
296        return biases
297
298    def format_output_file(self, file):
299        command_list = self.CLANG_FORMAT.split(' ')
300        command_list.append(file)
301        try:
302            process = subprocess.run(command_list)
303            if process.returncode != 0:
304                print(f"ERROR: {command_list = }")
305                sys.exit(1)
306        except Exception as e:
307            raise RuntimeError(f"{e} from: {command_list = }")
308
309    def write_c_header_wrapper(self):
310        filename = "test_data.h"
311        filepath = self.headers_dir + filename
312
313        print("Generating C header wrapper {}...".format(filepath))
314        with open(filepath, 'w+') as f:
315            f.write(self.tensorflow_reference_version)
316            while len(self.generated_header_files) > 0:
317                f.write('#include "{}"\n'.format(self.generated_header_files.pop()))
318        self.format_output_file(filepath)
319
320    def write_common_config(self, f, prefix):
321        """
322        Shared by conv/depthwise_conv and pooling
323        """
324        f.write("#define {}_FILTER_X {}\n".format(prefix, self.filter_x))
325        f.write("#define {}_FILTER_Y {}\n".format(prefix, self.filter_y))
326        f.write("#define {}_STRIDE_X {}\n".format(prefix, self.stride_x))
327        f.write("#define {}_STRIDE_Y {}\n".format(prefix, self.stride_y))
328        f.write("#define {}_PAD_X {}\n".format(prefix, self.pad_x))
329        f.write("#define {}_PAD_Y {}\n".format(prefix, self.pad_y))
330        f.write("#define {}_OUTPUT_W {}\n".format(prefix, self.x_output))
331        f.write("#define {}_OUTPUT_H {}\n".format(prefix, self.y_output))
332
333    def write_c_common_header(self, f):
334        f.write(self.tensorflow_reference_version)
335        f.write("#pragma once\n")
336
337    def write_c_config_header(self, write_common_parameters=True) -> None:
338        filename = self.config_data
339
340        self.generated_header_files.append(filename)
341        filepath = self.headers_dir + filename
342
343        prefix = self.testdataset.upper()
344
345        print("Writing C header with config data {}...".format(filepath))
346        with open(filepath, "w+") as f:
347            self.write_c_common_header(f)
348            if (write_common_parameters):
349                f.write("#define {}_OUT_CH {}\n".format(prefix, self.output_ch))
350                f.write("#define {}_IN_CH {}\n".format(prefix, self.input_ch))
351                f.write("#define {}_INPUT_W {}\n".format(prefix, self.x_input))
352                f.write("#define {}_INPUT_H {}\n".format(prefix, self.y_input))
353                f.write("#define {}_DST_SIZE {}\n".format(
354                    prefix, self.x_output * self.y_output * self.output_ch * self.batches))
355                f.write("#define {}_INPUT_SIZE {}\n".format(prefix, self.x_input * self.y_input * self.input_ch))
356                f.write("#define {}_OUT_ACTIVATION_MIN {}\n".format(prefix, self.out_activation_min))
357                f.write("#define {}_OUT_ACTIVATION_MAX {}\n".format(prefix, self.out_activation_max))
358                f.write("#define {}_INPUT_BATCHES {}\n".format(prefix, self.batches))
359        self.format_output_file(filepath)
360
361    def get_data_file_name_info(self, name_prefix) -> (str, str):
362        filename = name_prefix + "_data.h"
363        filepath = self.headers_dir + filename
364        return filename, filepath
365
366    def generate_c_array(self, name, array, datatype="int8_t", const="const ", pack=False) -> None:
367        w = None
368
369        if type(array) is list:
370            w = array
371            size = len(array)
372        elif type(array) is np.ndarray:
373            w = array
374            w = w.ravel()
375            size = w.size
376        else:
377            w = array.numpy()
378            w = w.ravel()
379            size = tf.size(array)
380
381        if pack:
382            size = (size // 2) + (size % 2)
383
384        filename, filepath = self.get_data_file_name_info(name)
385        self.generated_header_files.append(filename)
386
387        print("Generating C header {}...".format(filepath))
388        with open(filepath, "w+") as f:
389            self.write_c_common_header(f)
390            f.write("#include <stdint.h>\n\n")
391            if size > 0:
392                f.write(const + datatype + " " + self.testdataset + '_' + name + "[%d] =\n{\n" % size)
393                for i in range(size - 1):
394                    f.write("  %d,\n" % w[i])
395                f.write("  %d\n" % w[size - 1])
396                f.write("};\n")
397            else:
398                f.write(const + datatype + " *" + self.testdataset + '_' + name + " = NULL;\n")
399        self.format_output_file(filepath)
400
401    def calculate_padding(self, x_output, y_output, x_input, y_input):
402        if self.has_padding:
403            # Take dilation into account.
404            filter_x = (self.filter_x - 1) * self.dilation_x + 1
405            filter_y = (self.filter_y - 1) * self.dilation_y + 1
406
407            pad_along_width = max((x_output - 1) * self.stride_x + filter_x - x_input, 0)
408            pad_along_height = max((y_output - 1) * self.stride_y + filter_y - y_input, 0)
409
410            pad_top = pad_along_height // 2
411            pad_left = pad_along_width // 2
412            pad_top_offset = pad_along_height % 2
413            pad_left_offset = pad_along_width % 2
414
415            self.pad_y_with_offset = pad_top + pad_top_offset
416            self.pad_x_with_offset = pad_left + pad_left_offset
417            self.pad_x = pad_left
418            self.pad_y = pad_top
419        else:
420            self.pad_x = 0
421            self.pad_y = 0
422            self.pad_y_with_offset = 0
423            self.pad_x_with_offset = 0
424
425    @abstractmethod
426    def generate_data(self, input_data=None, weights=None, biases=None) -> None:
427        ''' Must be overriden '''
428
429    def quantize_scale(self, scale):
430        significand, shift = math.frexp(scale)
431        significand_q31 = round(significand * (1 << 31))
432        return significand_q31, shift
433
434    def get_calib_data_func(self, n_inputs, shape):
435
436        def representative_data_gen():
437            representative_testsets = []
438            if n_inputs > 0:
439                for i in range(n_inputs):
440                    representative_testsets.append(np.ones(shape, dtype=np.float32))
441                yield representative_testsets
442            else:
443                raise RuntimeError("Invalid number of representative test sets: {}. Must be more than 0".format(
444                    self.test_type))
445
446        return representative_data_gen
447
448    def convert_and_interpret(self, model, inttype, input_data=None, dataset_shape=None):
449        """
450        Compile and convert a model to Tflite format, run interpreter and allocate tensors.
451        """
452        self.convert_model(model, inttype, dataset_shape)
453        return self.interpret_model(input_data, inttype)
454
455    def convert_model(self, model, inttype, dataset_shape=None, int16x8_int32bias=False):
456        model.compile(loss=keras.losses.categorical_crossentropy,
457                      optimizer=keras.optimizers.Adam(),
458                      metrics=['accuracy'])
459        n_inputs = len(model.inputs)
460
461        if dataset_shape:
462            representative_dataset_shape = dataset_shape
463        else:
464            representative_dataset_shape = (self.batches, self.y_input, self.x_input, self.input_ch)
465
466        converter = tf.lite.TFLiteConverter.from_keras_model(model)
467
468        representative_dataset = self.get_calib_data_func(n_inputs, representative_dataset_shape)
469
470        converter.optimizations = [tf.lite.Optimize.DEFAULT]
471        converter.representative_dataset = representative_dataset
472        if self.is_int16xint8:
473            if int16x8_int32bias:
474                converter._experimental_full_integer_quantization_bias_type = tf.int32
475            converter.target_spec.supported_ops = [
476                tf.lite.OpsSet.EXPERIMENTAL_TFLITE_BUILTINS_ACTIVATIONS_INT16_WEIGHTS_INT8
477            ]
478        else:
479            converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
480        converter.inference_input_type = inttype
481        converter.inference_output_type = inttype
482        tflite_model = converter.convert()
483
484        os.makedirs(os.path.dirname(self.model_path_tflite), exist_ok=True)
485        with open(self.model_path_tflite, "wb") as model:
486            model.write(tflite_model)
487
488    def interpret_model(self, input_data, inttype):
489        interpreter = self.Interpreter(model_path=str(self.model_path_tflite),
490                                       experimental_op_resolver_type=self.OpResolverType.BUILTIN_REF)
491        interpreter.allocate_tensors()
492
493        output_details = interpreter.get_output_details()
494        (self.output_scale, self.output_zero_point) = output_details[0]['quantization']
495
496        if input_data is not None:
497            input_details = interpreter.get_input_details()
498            (self.input_scale, self.input_zero_point) = input_details[0]['quantization']
499
500            # Set input tensors
501            interpreter.set_tensor(input_details[0]["index"], tf.cast(input_data, inttype))
502
503        return interpreter
504
505    # TODO: make it a more generic function and remove reference to svdf specific names
506    def generate_json_from_template(self,
507                                    weights_feature_data=None,
508                                    weights_time_data=None,
509                                    bias_data=None,
510                                    int8_time_weights=False,
511                                    bias_buffer=3):
512        """
513        Takes a json template and parameters as input and creates a new json file.
514        """
515        generated_json_file = self.model_path + '.json'
516
517        with open(self.json_template, 'r') as in_file, open(generated_json_file, 'w') as out_file:
518            # Update shapes, scales and zero points
519            data = in_file.read()
520            for item, to_replace in self.json_replacements.items():
521                data = data.replace(item, str(to_replace))
522
523            data = json.loads(data)
524
525            # Update weights and bias data
526            if weights_feature_data is not None:
527                w_1_buffer_index = 1
528                data["buffers"][w_1_buffer_index]["data"] = self.to_bytes(weights_feature_data.numpy().ravel(), 1)
529            if weights_time_data is not None:
530                w_2_buffer_index = 2
531                if int8_time_weights:
532                    data["buffers"][w_2_buffer_index]["data"] = self.to_bytes(weights_time_data.numpy().ravel(), 1)
533                else:
534                    data["buffers"][w_2_buffer_index]["data"] = self.to_bytes(weights_time_data.numpy().ravel(), 2)
535
536            if bias_data is not None:
537                bias_buffer_index = bias_buffer
538                data["buffers"][bias_buffer_index]["data"] = self.to_bytes(bias_data.numpy().ravel(), 4)
539
540            json.dump(data, out_file, indent=2)
541
542        return generated_json_file
543
544    def flatc_generate_tflite(self, json_input, schema):
545        flatc = 'flatc'
546        if schema is None:
547            raise RuntimeError("A schema file is required.")
548        command = "{} -o {} -c -b {} {}".format(flatc, self.headers_dir, schema, json_input)
549        command_list = command.split(' ')
550        try:
551            process = subprocess.run(command_list)
552            if process.returncode != 0:
553                print(f"ERROR: {command = }")
554                sys.exit(1)
555        except Exception as e:
556            raise RuntimeError(f"{e} from: {command = }. Did you install flatc?")
557
558    def to_bytes(self, tensor_data, type_size) -> bytes:
559        result_bytes = []
560
561        if type_size == 1:
562            tensor_type = np.uint8
563        elif type_size == 2:
564            tensor_type = np.uint16
565        elif type_size == 4:
566            tensor_type = np.uint32
567        else:
568            raise RuntimeError("Size not supported: {}".format(type_size))
569
570        for val in tensor_data:
571            for byte in int(tensor_type(val)).to_bytes(type_size, 'little'):
572                result_bytes.append(byte)
573
574        return result_bytes
575