1#!/usr/bin/env python3 2# 3# Copyright (c) 2021 Intel Corporation 4# 5# SPDX-License-Identifier: Apache-2.0 6 7""" 8Class for Dictionary-based Logging Database 9""" 10 11import base64 12import copy 13import json 14 15from .mipi_syst import gen_syst_xml_file 16from .utils import extract_one_string_in_section, find_string_in_mappings 17 18ARCHS = { 19 "arc": { 20 "kconfig": "CONFIG_ARC", 21 }, 22 "arm": { 23 "kconfig": "CONFIG_ARM", 24 }, 25 "arm64": { 26 "kconfig": "CONFIG_ARM64", 27 }, 28 "mips": { 29 "kconfig": "CONFIG_MIPS", 30 }, 31 "sparc": { 32 "kconfig": "CONFIG_SPARC", 33 }, 34 "x86": { 35 "kconfig": "CONFIG_X86", 36 }, 37 "posix": { 38 "kconfig": "CONFIG_ARCH_POSIX", 39 }, 40 "riscv32e": { 41 "kconfig": "CONFIG_RISCV_ISA_RV32E", 42 }, 43 "riscv": { 44 "kconfig": "CONFIG_RISCV", 45 }, 46 "rx": { 47 "kconfig": "CONFIG_RX", 48 }, 49 "xtensa": { 50 "kconfig": "CONFIG_XTENSA", 51 }, 52} 53 54 55class LogDatabase: 56 """Class of log database""" 57 58 # Update this if database format of dictionary based logging 59 # has changed 60 ZEPHYR_DICT_LOG_VER = 3 61 62 LITTLE_ENDIAN = True 63 BIG_ENDIAN = False 64 65 def __init__(self): 66 new_db = {} 67 68 new_db['version'] = self.ZEPHYR_DICT_LOG_VER 69 new_db['target'] = {} 70 new_db['log_subsys'] = {} 71 new_db['log_subsys']['log_instances'] = {} 72 new_db['build_id'] = None 73 new_db['arch'] = None 74 new_db['kconfigs'] = {} 75 76 self.database = new_db 77 78 def get_version(self): 79 """Get Database Version""" 80 return self.database['version'] 81 82 def get_build_id(self): 83 """Get Build ID""" 84 return self.database['build_id'] 85 86 def set_build_id(self, build_id): 87 """Set Build ID in Database""" 88 self.database['build_id'] = build_id 89 90 def get_arch(self): 91 """Get the Target Architecture""" 92 return self.database['arch'] 93 94 def set_arch(self, arch): 95 """Set the Target Architecture""" 96 self.database['arch'] = arch 97 98 def get_tgt_bits(self): 99 """Get Target Bitness: 32 or 64""" 100 if 'bits' in self.database['target']: 101 return self.database['target']['bits'] 102 103 return None 104 105 def set_tgt_bits(self, bits): 106 """Set Target Bitness: 32 or 64""" 107 self.database['target']['bits'] = bits 108 109 def is_tgt_64bit(self): 110 """Return True if target is 64-bit, False if 32-bit. 111 None if error.""" 112 if 'bits' not in self.database['target']: 113 return None 114 115 if self.database['target']['bits'] == 32: 116 return False 117 118 if self.database['target']['bits'] == 64: 119 return True 120 121 return None 122 123 def get_tgt_endianness(self): 124 """ 125 Get Target Endianness. 126 127 Return True if little endian, False if big. 128 """ 129 if 'little_endianness' in self.database['target']: 130 return self.database['target']['little_endianness'] 131 132 return None 133 134 def set_tgt_endianness(self, endianness): 135 """ 136 Set Target Endianness 137 138 True if little endian, False if big. 139 """ 140 self.database['target']['little_endianness'] = endianness 141 142 def is_tgt_little_endian(self): 143 """Return True if target is little endian""" 144 if 'little_endianness' not in self.database['target']: 145 return None 146 147 return self.database['target']['little_endianness'] == self.LITTLE_ENDIAN 148 149 def get_string_mappings(self): 150 """Get string mappings to database""" 151 return self.database['string_mappings'] 152 153 def set_string_mappings(self, database): 154 """Add string mappings to database""" 155 self.database['string_mappings'] = database 156 157 def has_string_mappings(self): 158 """Return True if there are string mappings in database""" 159 return 'string_mappings' in self.database 160 161 def has_string_sections(self): 162 """Return True if there are any static string sections""" 163 if 'sections' not in self.database: 164 return False 165 166 return len(self.database['sections']) != 0 167 168 def __find_string_in_mappings(self, string_ptr): 169 """ 170 Find string pointed by string_ptr in the string mapping 171 list. Return None if not found. 172 """ 173 return find_string_in_mappings(self.database['string_mappings'], string_ptr) 174 175 def __find_string_in_sections(self, string_ptr): 176 """ 177 Find string pointed by string_ptr in the binary data 178 sections. Return None if not found. 179 """ 180 for _, sect in self.database['sections'].items(): 181 one_str = extract_one_string_in_section(sect, string_ptr) 182 183 if one_str is not None: 184 return one_str 185 186 return None 187 188 def find_string(self, string_ptr): 189 """Find string pointed by string_ptr in the database. 190 Return None if not found.""" 191 one_str = None 192 193 if self.has_string_mappings(): 194 one_str = self.__find_string_in_mappings(string_ptr) 195 196 if one_str is None and self.has_string_sections(): 197 one_str = self.__find_string_in_sections(string_ptr) 198 199 return one_str 200 201 def add_log_instance(self, source_id, name, level, address): 202 """Add one log instance into database""" 203 self.database['log_subsys']['log_instances'][source_id] = { 204 'source_id': source_id, 205 'name': name, 206 'level': level, 207 'addr': address, 208 } 209 210 def get_log_source_string(self, domain_id, source_id): 211 """Get the source string based on source ID""" 212 # JSON stores key as string, so we need to convert 213 src_id = str(source_id) 214 215 if src_id in self.database['log_subsys']['log_instances']: 216 return self.database['log_subsys']['log_instances'][src_id]['name'] 217 218 return f"unknown<{domain_id}:{source_id}>" 219 220 def add_kconfig(self, name, val): 221 """Add a kconfig name-value pair into database""" 222 self.database['kconfigs'][name] = val 223 224 def get_kconfigs(self): 225 """Return kconfig name-value pairs""" 226 return self.database['kconfigs'] 227 228 @staticmethod 229 def read_json_database(db_file_name): 230 """Read database from file and return a LogDatabase object""" 231 try: 232 with open(db_file_name, encoding="iso-8859-1") as db_fd: 233 json_db = json.load(db_fd) 234 except (OSError, json.JSONDecodeError): 235 return None 236 237 # Decode data in JSON back into binary data 238 if 'sections' in json_db: 239 for _, sect in json_db['sections'].items(): 240 sect['data'] = base64.b64decode(sect['data_b64']) 241 242 database = LogDatabase() 243 database.database = json_db 244 245 # JSON encodes the addresses in string mappings as literal strings. 246 # So convert them back to integers, as this is needed for partial 247 # matchings. 248 if database.has_string_mappings(): 249 new_str_map = {} 250 251 for addr, one_str in database.get_string_mappings().items(): 252 new_str_map[int(addr)] = one_str 253 254 database.set_string_mappings(new_str_map) 255 256 return database 257 258 @staticmethod 259 def write_json_database(db_file_name, database): 260 """Write the database into file""" 261 json_db = copy.deepcopy(database.database) 262 263 # Make database object into something JSON can dump 264 if 'sections' in json_db: 265 for _, sect in json_db['sections'].items(): 266 encoded = base64.b64encode(sect['data']) 267 sect['data_b64'] = encoded.decode('ascii') 268 del sect['data'] 269 270 try: 271 with open(db_file_name, "w", encoding="iso-8859-1") as db_fd: 272 db_fd.write(json.dumps(json_db)) 273 except OSError: 274 return False 275 276 return True 277 278 @staticmethod 279 def write_syst_database(db_file_name, database): 280 """ 281 Write the database into MIPI Sys-T Collateral XML file 282 """ 283 284 try: 285 with open(db_file_name, "w", encoding="iso-8859-1") as db_fd: 286 xml = gen_syst_xml_file(database) 287 db_fd.write(xml) 288 except OSError: 289 return False 290 291 return True 292