1#!/usr/bin/env python3 2# 3# Copyright (c) 2021, 2024 Intel Corporation 4# 5# SPDX-License-Identifier: Apache-2.0 6 7# Parsers are gonig to have very similar code. 8# So tell pylint not to care. 9# pylint: disable=duplicate-code 10 11""" 12Dictionary-based Logging Parser Version 3 13 14This contains the implementation of the parser for 15version 3 databases. 16""" 17 18import logging 19import struct 20import colorama 21from colorama import Fore 22 23from .log_parser import (LogParser, get_log_level_str_color, formalize_fmt_string) 24from .data_types import DataTypes 25 26 27HEX_BYTES_IN_LINE = 16 28 29# Need to keep sync with struct log_dict_output_msg_hdr in 30# include/logging/log_output_dict.h. 31# 32# struct log_dict_output_normal_msg_hdr_t { 33# uint8_t type; 34# uint32_t domain:4; 35# uint32_t level:4; 36# uint32_t package_len:16; 37# uint32_t data_len:16; 38# uintptr_t source; 39# log_timestamp_t timestamp; 40# } __packed; 41# 42# Note "type" and "timestamp" are encoded separately below. 43FMT_MSG_HDR_32 = "BHHI" 44FMT_MSG_HDR_64 = "BHHQ" 45 46# Message type 47# 0: normal message 48# 1: number of dropped messages 49FMT_MSG_TYPE = "B" 50 51# Depends on CONFIG_LOG_TIMESTAMP_64BIT 52FMT_MSG_TIMESTAMP_32 = "I" 53FMT_MSG_TIMESTAMP_64 = "Q" 54 55# Keep message types in sync with include/logging/log_output_dict.h 56MSG_TYPE_NORMAL = 0 57MSG_TYPE_DROPPED = 1 58 59# Number of dropped messages 60FMT_DROPPED_CNT = "H" 61 62 63logger = logging.getLogger("parser") 64 65 66class LogParserV3(LogParser): 67 """Log Parser V1""" 68 def __init__(self, database): 69 super().__init__(database=database) 70 71 if self.database.is_tgt_little_endian(): 72 endian = "<" 73 self.is_big_endian = False 74 else: 75 endian = ">" 76 self.is_big_endian = True 77 78 self.fmt_msg_type = endian + FMT_MSG_TYPE 79 self.fmt_dropped_cnt = endian + FMT_DROPPED_CNT 80 81 if self.database.is_tgt_64bit(): 82 self.fmt_msg_hdr = endian + FMT_MSG_HDR_64 83 else: 84 self.fmt_msg_hdr = endian + FMT_MSG_HDR_32 85 86 if "CONFIG_LOG_TIMESTAMP_64BIT" in self.database.get_kconfigs(): 87 self.fmt_msg_timestamp = endian + FMT_MSG_TIMESTAMP_64 88 else: 89 self.fmt_msg_timestamp = endian + FMT_MSG_TIMESTAMP_32 90 91 92 def __get_string(self, arg, arg_offset, string_tbl): 93 one_str = self.database.find_string(arg) 94 if one_str is not None: 95 ret = one_str 96 else: 97 # The index from the string table is basically 98 # the order in va_list. Need to add to the index 99 # to skip the packaged string header and 100 # the format string. 101 str_idx = arg_offset + self.data_types.get_sizeof(DataTypes.PTR) * 2 102 str_idx /= self.data_types.get_sizeof(DataTypes.INT) 103 104 if int(str_idx) not in string_tbl: 105 ret = f'<string@0x{arg:x}>' 106 else: 107 ret = string_tbl[int(str_idx)] 108 109 return ret 110 111 112 def process_one_fmt_str(self, fmt_str, arg_list, string_tbl): 113 """Parse the format string to extract arguments from 114 the binary arglist and return a tuple usable with 115 Python's string formatting""" 116 idx = 0 117 arg_offset = 0 118 arg_data_type = None 119 is_parsing = False 120 do_extract = False 121 122 args = [] 123 124 # Translated from cbvprintf_package() 125 for idx, fmt in enumerate(fmt_str): 126 if not is_parsing: 127 if fmt == '%': 128 is_parsing = True 129 arg_data_type = DataTypes.INT 130 continue 131 132 elif fmt == '%': 133 # '%%' -> literal percentage sign 134 is_parsing = False 135 continue 136 137 elif fmt == '*': 138 pass 139 140 elif fmt.isdecimal() or str.lower(fmt) == 'l' \ 141 or fmt in (' ', '#', '-', '+', '.', 'h'): 142 # formatting modifiers, just ignore 143 continue 144 145 elif fmt in ('j', 'z', 't'): 146 # intmax_t, size_t or ptrdiff_t 147 arg_data_type = DataTypes.LONG 148 149 elif fmt in ('c', 'd', 'i', 'o', 'u', 'x', 'X'): 150 unsigned = fmt in ('c', 'o', 'u', 'x', 'X') 151 152 if fmt_str[idx - 1] == 'l': 153 if fmt_str[idx - 2] == 'l': 154 arg_data_type = DataTypes.ULONG_LONG if unsigned else DataTypes.LONG_LONG 155 else: 156 arg_data_type = DataTypes.ULONG if unsigned else DataTypes.LONG 157 else: 158 arg_data_type = DataTypes.UINT if unsigned else DataTypes.INT 159 160 is_parsing = False 161 do_extract = True 162 163 elif fmt in ('s', 'p', 'n'): 164 arg_data_type = DataTypes.PTR 165 166 is_parsing = False 167 do_extract = True 168 169 elif str.lower(fmt) in ('a', 'e', 'f', 'g'): 170 # Python doesn't do"long double". 171 # 172 # Parse it as double (probably incorrect), but 173 # still have to skip enough bytes. 174 if fmt_str[idx - 1] == 'L': 175 arg_data_type = DataTypes.LONG_DOUBLE 176 else: 177 arg_data_type = DataTypes.DOUBLE 178 179 is_parsing = False 180 do_extract = True 181 182 else: 183 is_parsing = False 184 continue 185 186 if do_extract: 187 do_extract = False 188 189 align = self.data_types.get_alignment(arg_data_type) 190 size = self.data_types.get_sizeof(arg_data_type) 191 unpack_fmt = self.data_types.get_formatter(arg_data_type) 192 193 # Align the argument list by rounding up 194 stack_align = self.data_types.get_stack_alignment(arg_data_type) 195 if stack_align > 1: 196 arg_offset = int((arg_offset + (align - 1)) / align) * align 197 198 one_arg = struct.unpack_from(unpack_fmt, arg_list, arg_offset)[0] 199 200 if fmt == 's': 201 one_arg = self.__get_string(one_arg, arg_offset, string_tbl) 202 203 args.append(one_arg) 204 arg_offset += size 205 206 # Align the offset 207 if stack_align > 1: 208 arg_offset = int((arg_offset + align - 1) / align) * align 209 210 return tuple(args) 211 212 213 @staticmethod 214 def extract_string_table(str_tbl): 215 """Extract string table in a packaged log message""" 216 tbl = {} 217 218 one_str = "" 219 next_new_string = True 220 # Translated from cbvprintf_package() 221 for one_ch in str_tbl: 222 if next_new_string: 223 str_idx = one_ch 224 next_new_string = False 225 continue 226 227 if one_ch == 0: 228 tbl[str_idx] = one_str 229 one_str = "" 230 next_new_string = True 231 continue 232 233 one_str += chr(one_ch) 234 235 return tbl 236 237 238 @staticmethod 239 def print_hexdump(hex_data, prefix_len, color): 240 """Print hex dump""" 241 hex_vals = "" 242 chr_vals = "" 243 chr_done = 0 244 245 for one_hex in hex_data: 246 hex_vals += f'{one_hex:02x} ' 247 chr_vals += chr(one_hex) 248 chr_done += 1 249 250 if chr_done == HEX_BYTES_IN_LINE / 2: 251 hex_vals += " " 252 chr_vals += " " 253 254 elif chr_done == HEX_BYTES_IN_LINE: 255 print(f"{color}%s%s|%s{Fore.RESET}" % ((" " * prefix_len), 256 hex_vals, chr_vals)) 257 hex_vals = "" 258 chr_vals = "" 259 chr_done = 0 260 261 if len(chr_vals) > 0: 262 hex_padding = " " * (HEX_BYTES_IN_LINE - chr_done) 263 print(f"{color}%s%s%s|%s{Fore.RESET}" % ((" " * prefix_len), 264 hex_vals, hex_padding, chr_vals)) 265 266 267 def parse_one_normal_msg(self, logdata, offset): 268 """Parse one normal log message and print the encoded message""" 269 # Parse log message header 270 domain_lvl, pkg_len, data_len, source_id = struct.unpack_from(self.fmt_msg_hdr, 271 logdata, offset) 272 offset += struct.calcsize(self.fmt_msg_hdr) 273 274 timestamp = struct.unpack_from(self.fmt_msg_timestamp, logdata, offset)[0] 275 offset += struct.calcsize(self.fmt_msg_timestamp) 276 277 # domain_id, level 278 if self.is_big_endian: 279 level = domain_lvl & 0x0F 280 domain_id = (domain_lvl >> 4) & 0x0F 281 else: 282 domain_id = domain_lvl & 0x0F 283 level = (domain_lvl >> 4) & 0x0F 284 285 level_str, color = get_log_level_str_color(level) 286 source_id_str = self.database.get_log_source_string(domain_id, source_id) 287 288 # Skip over data to point to next message (save as return value) 289 next_msg_offset = offset + pkg_len + data_len 290 291 # Offset from beginning of cbprintf_packaged data to end of va_list arguments 292 offset_end_of_args = struct.unpack_from("B", logdata, offset)[0] 293 offset_end_of_args *= self.data_types.get_sizeof(DataTypes.INT) 294 offset_end_of_args += offset 295 296 # Extra data after packaged log 297 extra_data = logdata[(offset + pkg_len):next_msg_offset] 298 299 # Number of appended strings in package 300 num_packed_strings = struct.unpack_from("B", logdata, offset+1)[0] 301 302 # Number of read-only string indexes 303 num_ro_str_indexes = struct.unpack_from("B", logdata, offset+2)[0] 304 offset_end_of_args += num_ro_str_indexes 305 306 # Number of read-write string indexes 307 num_rw_str_indexes = struct.unpack_from("B", logdata, offset+3)[0] 308 offset_end_of_args += num_rw_str_indexes 309 310 # Extract the string table in the packaged log message 311 string_tbl = self.extract_string_table(logdata[offset_end_of_args:(offset + pkg_len)]) 312 313 if len(string_tbl) != num_packed_strings: 314 logger.error("------ Error extracting string table") 315 return None 316 317 # Skip packaged string header 318 offset += self.data_types.get_sizeof(DataTypes.PTR) 319 320 # Grab the format string 321 # 322 # Note the negative offset to __get_string(). It is because 323 # the offset begins at 0 for va_list. However, the format string 324 # itself is before the va_list, so need to go back the width of 325 # a pointer. 326 fmt_str_ptr = struct.unpack_from(self.data_types.get_formatter(DataTypes.PTR), 327 logdata, offset)[0] 328 fmt_str = self.__get_string(fmt_str_ptr, 329 -self.data_types.get_sizeof(DataTypes.PTR), 330 string_tbl) 331 offset += self.data_types.get_sizeof(DataTypes.PTR) 332 333 if not fmt_str: 334 logger.error("------ Error getting format string at 0x%x", fmt_str_ptr) 335 return None 336 337 args = self.process_one_fmt_str(fmt_str, logdata[offset:offset_end_of_args], string_tbl) 338 339 fmt_str = formalize_fmt_string(fmt_str) 340 log_msg = fmt_str % args 341 342 if level == 0: 343 print(f"{log_msg}", end='') 344 log_prefix = "" 345 else: 346 log_prefix = f"[{timestamp:>10}] <{level_str}> {source_id_str}: " 347 print(f"{color}%s%s{Fore.RESET}" % (log_prefix, log_msg)) 348 349 if data_len > 0: 350 # Has hexdump data 351 self.print_hexdump(extra_data, len(log_prefix), color) 352 353 # Point to next message 354 return next_msg_offset 355 356 357 def parse_log_data(self, logdata, debug=False): 358 """Parse binary log data and print the encoded log messages""" 359 offset = 0 360 361 while offset < len(logdata): 362 # Get message type 363 msg_type = struct.unpack_from(self.fmt_msg_type, logdata, offset)[0] 364 offset += struct.calcsize(self.fmt_msg_type) 365 366 if msg_type == MSG_TYPE_DROPPED: 367 num_dropped = struct.unpack_from(self.fmt_dropped_cnt, logdata, offset) 368 offset += struct.calcsize(self.fmt_dropped_cnt) 369 370 print(f"--- {num_dropped} messages dropped ---") 371 372 elif msg_type == MSG_TYPE_NORMAL: 373 ret = self.parse_one_normal_msg(logdata, offset) 374 if ret is None: 375 return False 376 377 offset = ret 378 379 else: 380 logger.error("------ Unknown message type: %s", msg_type) 381 return False 382 383 return True 384 385colorama.init() 386