1# SPDX-FileCopyrightText: Copyright 2010-2024 Arm Limited and/or its affiliates <open-source-office@arm.com> 2# 3# SPDX-License-Identifier: Apache-2.0 4# 5# Licensed under the Apache License, Version 2.0 (the License); you may 6# not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an AS IS BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17import os 18import sys 19import json 20import math 21import subprocess 22import keras 23 24from abc import ABC, abstractmethod 25from packaging import version 26 27import numpy as np 28import tensorflow as tf 29import tf_keras as keras 30 31class TestSettings(ABC): 32 33 # This is the generated test data used by the test cases. 34 OUTDIR = 'TestCases/TestData/' 35 36 # This is input to the data generation. If everything or something is regenerated then it is overwritten. 37 # So it always has the same data as the OUTDIR. 38 # The purpose of the pregen is primarily for debugging, as it is enabling to change a single parameter and see how 39 # output changes (or not changes), without regenerating all input data. 40 # It also convinient when testing changes in the script, to be able to run all test sets again. 41 PREGEN = 'PregeneratedData/' 42 43 INT32_MAX = 2147483647 44 INT32_MIN = -2147483648 45 INT64_MAX = 9223372036854775807 46 INT64_MIN = -9223372036854775808 47 INT16_MAX = 32767 48 INT16_MIN = -32768 49 INT8_MAX = 127 50 INT8_MIN = -128 51 INT4_MAX = 7 52 INT4_MIN = -8 53 54 REQUIRED_MINIMUM_TENSORFLOW_VERSION = version.parse("2.10") 55 56 CLANG_FORMAT = 'clang-format-12 -i' # For formatting generated headers. 57 58 def __init__(self, 59 dataset, 60 testtype, 61 regenerate_weights, 62 regenerate_input, 63 regenerate_biases, 64 schema_file, 65 in_ch, 66 out_ch, 67 x_in, 68 y_in, 69 w_x, 70 w_y, 71 stride_x=1, 72 stride_y=1, 73 pad=False, 74 randmin=np.iinfo(np.dtype('int8')).min, 75 randmax=np.iinfo(np.dtype('int8')).max, 76 batches=1, 77 generate_bias=True, 78 relu6=False, 79 out_activation_min=None, 80 out_activation_max=None, 81 int16xint8=False, 82 bias_min=np.iinfo(np.dtype('int32')).min, 83 bias_max=np.iinfo(np.dtype('int32')).max, 84 dilation_x=1, 85 dilation_y=1, 86 interpreter="tensorflow", 87 int4_weights=False): 88 89 self.int4_weights = int4_weights 90 91 if self.INT8_MIN != np.iinfo(np.dtype('int8')).min or self.INT8_MAX != np.iinfo(np.dtype('int8')).max or \ 92 self.INT16_MIN != np.iinfo(np.dtype('int16')).min or self.INT16_MAX != np.iinfo(np.dtype('int16')).max or \ 93 self.INT32_MIN != np.iinfo(np.dtype('int32')).min or self.INT32_MAX != np.iinfo(np.dtype('int32')).max: 94 raise RuntimeError("Unexpected int min/max error") 95 96 self.use_tflite_micro_interpreter = False 97 98 if interpreter == "tflite_runtime": 99 from tflite_runtime.interpreter import Interpreter 100 from tflite_runtime.interpreter import OpResolverType 101 import tflite_runtime as tfl_runtime 102 103 revision = tfl_runtime.__git_version__ 104 version = tfl_runtime.__version__ 105 interpreter = "tflite_runtime" 106 107 elif interpreter == "tensorflow": 108 from tensorflow.lite.python.interpreter import Interpreter 109 from tensorflow.lite.python.interpreter import OpResolverType 110 111 revision = tf.__git_version__ 112 version = tf.__version__ 113 interpreter = "tensorflow" 114 115 elif interpreter == "tflite_micro": 116 from tensorflow.lite.python.interpreter import Interpreter 117 from tensorflow.lite.python.interpreter import OpResolverType 118 119 import tflite_micro 120 self.tflite_micro = tflite_micro 121 self.use_tflite_micro_interpreter = True 122 123 revision = None 124 version = tflite_micro.__version__ 125 interpreter = "tflite_micro" 126 else: 127 raise RuntimeError(f"Invalid interpreter {interpreter}") 128 129 self.Interpreter = Interpreter 130 self.OpResolverType = OpResolverType 131 132 self.tensorflow_reference_version = ( 133 "// Generated by {} using tensorflow version {} (Keras version {}).\n".format( 134 os.path.basename(__file__), tf.__version__, keras.__version__)) 135 136 self.tensorflow_reference_version += ("// Interpreter from {} version {} and revision {}.\n".format( 137 interpreter, version, revision)) 138 139 # Randomization interval 140 self.mins = randmin 141 self.maxs = randmax 142 143 self.bias_mins = bias_min 144 self.bias_maxs = bias_max 145 146 self.input_ch = in_ch 147 self.output_ch = out_ch 148 self.x_input = x_in 149 self.y_input = y_in 150 self.filter_x = w_x 151 self.filter_y = w_y 152 self.stride_x = stride_x 153 self.stride_y = stride_y 154 self.dilation_x = dilation_x 155 self.dilation_y = dilation_y 156 self.batches = batches 157 self.test_type = testtype 158 self.has_padding = pad 159 160 self.is_int16xint8 = int16xint8 161 162 if relu6: 163 self.out_activation_max = 6 164 self.out_activation_min = 0 165 else: 166 if out_activation_min is not None: 167 self.out_activation_min = out_activation_min 168 else: 169 self.out_activation_min = self.INT16_MIN if self.is_int16xint8 else self.INT8_MIN 170 if out_activation_max is not None: 171 self.out_activation_max = out_activation_max 172 else: 173 self.out_activation_max = self.INT16_MAX if self.is_int16xint8 else self.INT8_MAX 174 175 # Bias is optional. 176 self.generate_bias = generate_bias 177 178 self.generated_header_files = [] 179 self.pregenerated_data_dir = self.PREGEN 180 181 self.config_data = "config_data.h" 182 183 self.testdataset = dataset 184 185 self.kernel_table_file = self.pregenerated_data_dir + self.testdataset + '/' + 'kernel.txt' 186 self.inputs_table_file = self.pregenerated_data_dir + self.testdataset + '/' + 'input.txt' 187 self.bias_table_file = self.pregenerated_data_dir + self.testdataset + '/' + 'bias.txt' 188 189 if self.has_padding: 190 self.padding = 'SAME' 191 else: 192 self.padding = 'VALID' 193 194 self.regenerate_new_weights = regenerate_weights 195 self.regenerate_new_input = regenerate_input 196 self.regenerate_new_bias = regenerate_biases 197 self.schema_file = schema_file 198 199 self.headers_dir = self.OUTDIR + self.testdataset + '/' 200 os.makedirs(self.headers_dir, exist_ok=True) 201 202 self.model_path = "{}model_{}".format(self.headers_dir, self.testdataset) 203 self.model_path_tflite = self.model_path + '.tflite' 204 205 self.input_data_file_prefix = "input" 206 self.weight_data_file_prefix = "weights" 207 self.bias_data_file_prefix = "biases" 208 self.output_data_file_prefix = "output_ref" 209 210 def save_multiple_dim_array_in_txt(self, file, data): 211 header = ','.join(map(str, data.shape)) 212 np.savetxt(file, data.reshape(-1, data.shape[-1]), header=header, delimiter=',') 213 214 def load_multiple_dim_array_from_txt(self, file): 215 with open(file) as f: 216 shape = list(map(int, next(f)[1:].split(','))) 217 data = np.genfromtxt(f, delimiter=',').reshape(shape) 218 return data.astype(np.float32) 219 220 def convert_tensor_np(self, tensor_in, converter, *qminmax): 221 w = tensor_in.numpy() 222 shape = w.shape 223 w = w.ravel() 224 if len(qminmax) == 2: 225 fw = converter(w, qminmax[0], qminmax[1]) 226 else: 227 fw = converter(w) 228 fw.shape = shape 229 return tf.convert_to_tensor(fw) 230 231 def convert_tensor(self, tensor_in, converter, *qminmax): 232 w = tensor_in.numpy() 233 shape = w.shape 234 w = w.ravel() 235 normal = np.array(w) 236 float_normal = [] 237 238 for i in normal: 239 if len(qminmax) == 2: 240 float_normal.append(converter(i, qminmax[0], qminmax[1])) 241 else: 242 float_normal.append(converter(i)) 243 244 np_float_array = np.asarray(float_normal) 245 np_float_array.shape = shape 246 247 return tf.convert_to_tensor(np_float_array) 248 249 def get_randomized_data(self, dims, npfile, regenerate, decimals=0, minrange=None, maxrange=None): 250 if not minrange: 251 minrange = self.mins 252 if not maxrange: 253 maxrange = self.maxs 254 if not os.path.exists(npfile) or regenerate: 255 regendir = os.path.dirname(npfile) 256 os.makedirs(regendir, exist_ok=True) 257 if decimals == 0: 258 data = tf.Variable(tf.random.uniform(dims, minval=minrange, maxval=maxrange, dtype=tf.dtypes.int64)) 259 data = tf.cast(data, dtype=tf.float32) 260 else: 261 data = tf.Variable(tf.random.uniform(dims, minval=minrange, maxval=maxrange, dtype=tf.dtypes.float32)) 262 data = np.around(data.numpy(), decimals) 263 data = tf.convert_to_tensor(data) 264 265 print("Saving data to {}".format(npfile)) 266 self.save_multiple_dim_array_in_txt(npfile, data.numpy()) 267 else: 268 print("Loading data from {}".format(npfile)) 269 data = tf.convert_to_tensor(self.load_multiple_dim_array_from_txt(npfile)) 270 return data 271 272 def get_randomized_input_data(self, input_data, input_shape=None): 273 # Generate or load saved input data unless hardcoded data provided 274 if input_shape is None: 275 input_shape = [self.batches, self.y_input, self.x_input, self.input_ch] 276 if input_data is not None: 277 input_data = tf.reshape(input_data, input_shape) 278 else: 279 input_data = self.get_randomized_data(input_shape, 280 self.inputs_table_file, 281 regenerate=self.regenerate_new_input) 282 return input_data 283 284 def get_randomized_bias_data(self, biases): 285 # Generate or load saved bias data unless hardcoded data provided 286 if not self.generate_bias: 287 biases = tf.reshape(np.full([self.output_ch], 0), [self.output_ch]) 288 elif biases is not None: 289 biases = tf.reshape(biases, [self.output_ch]) 290 else: 291 biases = self.get_randomized_data([self.output_ch], 292 self.bias_table_file, 293 regenerate=self.regenerate_new_bias, 294 minrange=self.bias_mins, 295 maxrange=self.bias_maxs) 296 return biases 297 298 def format_output_file(self, file): 299 command_list = self.CLANG_FORMAT.split(' ') 300 command_list.append(file) 301 try: 302 process = subprocess.run(command_list) 303 if process.returncode != 0: 304 print(f"ERROR: {command_list = }") 305 sys.exit(1) 306 except Exception as e: 307 raise RuntimeError(f"{e} from: {command_list = }") 308 309 def write_c_header_wrapper(self): 310 filename = "test_data.h" 311 filepath = self.headers_dir + filename 312 313 print("Generating C header wrapper {}...".format(filepath)) 314 with open(filepath, 'w+') as f: 315 f.write(self.tensorflow_reference_version) 316 while len(self.generated_header_files) > 0: 317 f.write('#include "{}"\n'.format(self.generated_header_files.pop())) 318 self.format_output_file(filepath) 319 320 def write_common_config(self, f, prefix): 321 """ 322 Shared by conv/depthwise_conv and pooling 323 """ 324 f.write("#define {}_FILTER_X {}\n".format(prefix, self.filter_x)) 325 f.write("#define {}_FILTER_Y {}\n".format(prefix, self.filter_y)) 326 f.write("#define {}_STRIDE_X {}\n".format(prefix, self.stride_x)) 327 f.write("#define {}_STRIDE_Y {}\n".format(prefix, self.stride_y)) 328 f.write("#define {}_PAD_X {}\n".format(prefix, self.pad_x)) 329 f.write("#define {}_PAD_Y {}\n".format(prefix, self.pad_y)) 330 f.write("#define {}_OUTPUT_W {}\n".format(prefix, self.x_output)) 331 f.write("#define {}_OUTPUT_H {}\n".format(prefix, self.y_output)) 332 333 def write_c_common_header(self, f): 334 f.write(self.tensorflow_reference_version) 335 f.write("#pragma once\n") 336 337 def write_c_config_header(self, write_common_parameters=True) -> None: 338 filename = self.config_data 339 340 self.generated_header_files.append(filename) 341 filepath = self.headers_dir + filename 342 343 prefix = self.testdataset.upper() 344 345 print("Writing C header with config data {}...".format(filepath)) 346 with open(filepath, "w+") as f: 347 self.write_c_common_header(f) 348 if (write_common_parameters): 349 f.write("#define {}_OUT_CH {}\n".format(prefix, self.output_ch)) 350 f.write("#define {}_IN_CH {}\n".format(prefix, self.input_ch)) 351 f.write("#define {}_INPUT_W {}\n".format(prefix, self.x_input)) 352 f.write("#define {}_INPUT_H {}\n".format(prefix, self.y_input)) 353 f.write("#define {}_DST_SIZE {}\n".format( 354 prefix, self.x_output * self.y_output * self.output_ch * self.batches)) 355 f.write("#define {}_INPUT_SIZE {}\n".format(prefix, self.x_input * self.y_input * self.input_ch)) 356 f.write("#define {}_OUT_ACTIVATION_MIN {}\n".format(prefix, self.out_activation_min)) 357 f.write("#define {}_OUT_ACTIVATION_MAX {}\n".format(prefix, self.out_activation_max)) 358 f.write("#define {}_INPUT_BATCHES {}\n".format(prefix, self.batches)) 359 self.format_output_file(filepath) 360 361 def get_data_file_name_info(self, name_prefix) -> (str, str): 362 filename = name_prefix + "_data.h" 363 filepath = self.headers_dir + filename 364 return filename, filepath 365 366 def generate_c_array(self, name, array, datatype="int8_t", const="const ", pack=False) -> None: 367 w = None 368 369 if type(array) is list: 370 w = array 371 size = len(array) 372 elif type(array) is np.ndarray: 373 w = array 374 w = w.ravel() 375 size = w.size 376 else: 377 w = array.numpy() 378 w = w.ravel() 379 size = tf.size(array) 380 381 if pack: 382 size = (size // 2) + (size % 2) 383 384 filename, filepath = self.get_data_file_name_info(name) 385 self.generated_header_files.append(filename) 386 387 print("Generating C header {}...".format(filepath)) 388 with open(filepath, "w+") as f: 389 self.write_c_common_header(f) 390 f.write("#include <stdint.h>\n\n") 391 if size > 0: 392 f.write(const + datatype + " " + self.testdataset + '_' + name + "[%d] =\n{\n" % size) 393 for i in range(size - 1): 394 f.write(" %d,\n" % w[i]) 395 f.write(" %d\n" % w[size - 1]) 396 f.write("};\n") 397 else: 398 f.write(const + datatype + " *" + self.testdataset + '_' + name + " = NULL;\n") 399 self.format_output_file(filepath) 400 401 def calculate_padding(self, x_output, y_output, x_input, y_input): 402 if self.has_padding: 403 # Take dilation into account. 404 filter_x = (self.filter_x - 1) * self.dilation_x + 1 405 filter_y = (self.filter_y - 1) * self.dilation_y + 1 406 407 pad_along_width = max((x_output - 1) * self.stride_x + filter_x - x_input, 0) 408 pad_along_height = max((y_output - 1) * self.stride_y + filter_y - y_input, 0) 409 410 pad_top = pad_along_height // 2 411 pad_left = pad_along_width // 2 412 pad_top_offset = pad_along_height % 2 413 pad_left_offset = pad_along_width % 2 414 415 self.pad_y_with_offset = pad_top + pad_top_offset 416 self.pad_x_with_offset = pad_left + pad_left_offset 417 self.pad_x = pad_left 418 self.pad_y = pad_top 419 else: 420 self.pad_x = 0 421 self.pad_y = 0 422 self.pad_y_with_offset = 0 423 self.pad_x_with_offset = 0 424 425 @abstractmethod 426 def generate_data(self, input_data=None, weights=None, biases=None) -> None: 427 ''' Must be overriden ''' 428 429 def quantize_scale(self, scale): 430 significand, shift = math.frexp(scale) 431 significand_q31 = round(significand * (1 << 31)) 432 return significand_q31, shift 433 434 def get_calib_data_func(self, n_inputs, shape): 435 436 def representative_data_gen(): 437 representative_testsets = [] 438 if n_inputs > 0: 439 for i in range(n_inputs): 440 representative_testsets.append(np.ones(shape, dtype=np.float32)) 441 yield representative_testsets 442 else: 443 raise RuntimeError("Invalid number of representative test sets: {}. Must be more than 0".format( 444 self.test_type)) 445 446 return representative_data_gen 447 448 def convert_and_interpret(self, model, inttype, input_data=None, dataset_shape=None): 449 """ 450 Compile and convert a model to Tflite format, run interpreter and allocate tensors. 451 """ 452 self.convert_model(model, inttype, dataset_shape) 453 return self.interpret_model(input_data, inttype) 454 455 def convert_model(self, model, inttype, dataset_shape=None, int16x8_int32bias=False): 456 model.compile(loss=keras.losses.categorical_crossentropy, 457 optimizer=keras.optimizers.Adam(), 458 metrics=['accuracy']) 459 n_inputs = len(model.inputs) 460 461 if dataset_shape: 462 representative_dataset_shape = dataset_shape 463 else: 464 representative_dataset_shape = (self.batches, self.y_input, self.x_input, self.input_ch) 465 466 converter = tf.lite.TFLiteConverter.from_keras_model(model) 467 468 representative_dataset = self.get_calib_data_func(n_inputs, representative_dataset_shape) 469 470 converter.optimizations = [tf.lite.Optimize.DEFAULT] 471 converter.representative_dataset = representative_dataset 472 if self.is_int16xint8: 473 if int16x8_int32bias: 474 converter._experimental_full_integer_quantization_bias_type = tf.int32 475 converter.target_spec.supported_ops = [ 476 tf.lite.OpsSet.EXPERIMENTAL_TFLITE_BUILTINS_ACTIVATIONS_INT16_WEIGHTS_INT8 477 ] 478 else: 479 converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8] 480 converter.inference_input_type = inttype 481 converter.inference_output_type = inttype 482 tflite_model = converter.convert() 483 484 os.makedirs(os.path.dirname(self.model_path_tflite), exist_ok=True) 485 with open(self.model_path_tflite, "wb") as model: 486 model.write(tflite_model) 487 488 def interpret_model(self, input_data, inttype): 489 interpreter = self.Interpreter(model_path=str(self.model_path_tflite), 490 experimental_op_resolver_type=self.OpResolverType.BUILTIN_REF) 491 interpreter.allocate_tensors() 492 493 output_details = interpreter.get_output_details() 494 (self.output_scale, self.output_zero_point) = output_details[0]['quantization'] 495 496 if input_data is not None: 497 input_details = interpreter.get_input_details() 498 (self.input_scale, self.input_zero_point) = input_details[0]['quantization'] 499 500 # Set input tensors 501 interpreter.set_tensor(input_details[0]["index"], tf.cast(input_data, inttype)) 502 503 return interpreter 504 505 # TODO: make it a more generic function and remove reference to svdf specific names 506 def generate_json_from_template(self, 507 weights_feature_data=None, 508 weights_time_data=None, 509 bias_data=None, 510 int8_time_weights=False, 511 bias_buffer=3): 512 """ 513 Takes a json template and parameters as input and creates a new json file. 514 """ 515 generated_json_file = self.model_path + '.json' 516 517 with open(self.json_template, 'r') as in_file, open(generated_json_file, 'w') as out_file: 518 # Update shapes, scales and zero points 519 data = in_file.read() 520 for item, to_replace in self.json_replacements.items(): 521 data = data.replace(item, str(to_replace)) 522 523 data = json.loads(data) 524 525 # Update weights and bias data 526 if weights_feature_data is not None: 527 w_1_buffer_index = 1 528 data["buffers"][w_1_buffer_index]["data"] = self.to_bytes(weights_feature_data.numpy().ravel(), 1) 529 if weights_time_data is not None: 530 w_2_buffer_index = 2 531 if int8_time_weights: 532 data["buffers"][w_2_buffer_index]["data"] = self.to_bytes(weights_time_data.numpy().ravel(), 1) 533 else: 534 data["buffers"][w_2_buffer_index]["data"] = self.to_bytes(weights_time_data.numpy().ravel(), 2) 535 536 if bias_data is not None: 537 bias_buffer_index = bias_buffer 538 data["buffers"][bias_buffer_index]["data"] = self.to_bytes(bias_data.numpy().ravel(), 4) 539 540 json.dump(data, out_file, indent=2) 541 542 return generated_json_file 543 544 def flatc_generate_tflite(self, json_input, schema): 545 flatc = 'flatc' 546 if schema is None: 547 raise RuntimeError("A schema file is required.") 548 command = "{} -o {} -c -b {} {}".format(flatc, self.headers_dir, schema, json_input) 549 command_list = command.split(' ') 550 try: 551 process = subprocess.run(command_list) 552 if process.returncode != 0: 553 print(f"ERROR: {command = }") 554 sys.exit(1) 555 except Exception as e: 556 raise RuntimeError(f"{e} from: {command = }. Did you install flatc?") 557 558 def to_bytes(self, tensor_data, type_size) -> bytes: 559 result_bytes = [] 560 561 if type_size == 1: 562 tensor_type = np.uint8 563 elif type_size == 2: 564 tensor_type = np.uint16 565 elif type_size == 4: 566 tensor_type = np.uint32 567 else: 568 raise RuntimeError("Size not supported: {}".format(type_size)) 569 570 for val in tensor_data: 571 for byte in int(tensor_type(val)).to_bytes(type_size, 'little'): 572 result_bytes.append(byte) 573 574 return result_bytes 575